@@ -35,11 +35,6 @@ import groovy.transform.TupleConstructor
3535import groovy.util.logging.Slf4j
3636import io.seqera.http.HxClient
3737import io.seqera.util.trace.TraceUtils
38- import io.seqera.tower.ApiClient
39- import io.seqera.tower.ApiException
40- import io.seqera.tower.api.DatasetsApi
41- import io.seqera.tower.model.CreateDatasetRequest
42- import io.seqera.tower.model.CreateDatasetResponse
4338import nextflow.BuildInfo
4439import nextflow.Session
4540import nextflow.container.resolver.ContainerMeta
@@ -106,8 +101,6 @@ class TowerClient implements TraceObserverV2 {
106101
107102 private HxClient httpClient
108103
109- private DatasetsApi datasetsApi
110-
111104 private JsonGenerator generator
112105
113106 private String workflowId
@@ -164,7 +157,6 @@ class TowerClient implements TraceObserverV2 {
164157 this . schema = loadSchema()
165158 this . generator = TowerJsonGenerator . create(schema)
166159 this . reports = new TowerReports (session)
167- this . datasetsApi = createDatasetsApi()
168160 }
169161
170162 TowerClient withEnvironment (Map env ) {
@@ -177,30 +169,6 @@ class TowerClient implements TraceObserverV2 {
177169 this . generator = TowerJsonGenerator . create(Collections . EMPTY_MAP )
178170 }
179171
180- /**
181- * Create and configure a DatasetsApi client for Seqera Platform
182- *
183- * @return Configured DatasetsApi instance
184- */
185- protected DatasetsApi createDatasetsApi () {
186- if ( ! accessToken || ! endpoint ) {
187- return null
188- }
189-
190- try {
191- def apiClient = new ApiClient ()
192- apiClient. setBasePath(endpoint)
193- apiClient. setBearerToken(accessToken)
194- apiClient. setUserAgent(" Nextflow/$BuildInfo . version " )
195-
196- return new DatasetsApi (apiClient)
197- }
198- catch ( Exception e ) {
199- log. warn " Failed to initialize DatasetsApi: ${ e.message} "
200- return null
201- }
202- }
203-
204172 @Override
205173 boolean enableMetrics () { true }
206174
@@ -298,6 +266,18 @@ class TowerClient implements TraceObserverV2 {
298266 return result
299267 }
300268
269+ protected String getUrlDatasets () {
270+ if ( workspaceId )
271+ return " $endpoint /workspaces/$workspaceId /datasets/"
272+ return " $endpoint /datasets/"
273+ }
274+
275+ protected String getUrlDatasetUpload (String datasetId ) {
276+ if ( workspaceId )
277+ return " $endpoint /workspaces/$workspaceId /datasets/$datasetId /upload"
278+ return " $endpoint /datasets/$datasetId /upload"
279+ }
280+
301281 /**
302282 * On workflow start, submit a message with some basic
303283 * information, like Id, activity and an ISO 8601 formatted
@@ -979,7 +959,7 @@ class TowerClient implements TraceObserverV2 {
979959 }
980960
981961 /**
982- * Create a new dataset in Seqera Platform using tower-java-sdk
962+ * Create a new dataset in Seqera Platform
983963 *
984964 * @param name The name for the new dataset
985965 * @param description The description for the new dataset
@@ -988,39 +968,40 @@ class TowerClient implements TraceObserverV2 {
988968 private String createDataset (String name , String description ) {
989969 log. info " Creating new dataset: ${ name} "
990970
991- if ( ! datasetsApi ) {
992- log. warn " DatasetsApi not initialized, cannot create dataset"
993- return null
994- }
995-
996971 try {
997- def request = new CreateDatasetRequest ()
998- request. setName(name)
999- request. setDescription(" Workflow output: ${ description} " )
1000-
1001- def wspId = workspaceId ? Long . valueOf(workspaceId) : null
1002- CreateDatasetResponse response = datasetsApi. createDataset(wspId, request)
972+ final payload = [
973+ name : name,
974+ description : " Workflow output: ${ description} " ,
975+ header : true
976+ ]
977+
978+ final url = getUrlDatasets()
979+ final resp = sendHttpMessage(url, payload, ' POST' )
980+
981+ if ( resp. isError() ) {
982+ log. warn " Failed to create dataset '${ name} ': ${ resp.message} "
983+ return null
984+ }
1003985
1004- def datasetId = response. dataset?. id?. toString()
986+ // Parse the response to extract dataset ID
987+ final json = new JsonSlurper (). parseText(resp. message) as Map
988+ final dataset = json. dataset as Map
989+ final datasetId = dataset?. id as String
1005990
1006991 if ( datasetId ) {
1007992 log. info " Created dataset '${ name} ' with ID: ${ datasetId} "
1008993 }
1009994
1010995 return datasetId
1011996 }
1012- catch ( ApiException e ) {
1013- log. warn " Failed to create dataset '${ name} ': ${ e.message} (status: ${ e.code} )" , e
1014- return null
1015- }
1016997 catch ( Exception e ) {
1017998 log. warn " Failed to create dataset '${ name} ': ${ e.message} " , e
1018999 return null
10191000 }
10201001 }
10211002
10221003 /**
1023- * Upload an index file to a dataset using tower-java-sdk
1004+ * Upload an index file to a dataset
10241005 *
10251006 * @param datasetId The ID of the dataset
10261007 * @param indexPath The path to the index file
@@ -1032,27 +1013,95 @@ class TowerClient implements TraceObserverV2 {
10321013 return
10331014 }
10341015
1035- if ( ! datasetsApi ) {
1036- log. warn " DatasetsApi not initialized, cannot upload index file"
1037- return
1038- }
1039-
10401016 log. info " Uploading index file for output '${ outputName} ' to dataset ${ datasetId} : ${ indexPath} "
10411017
10421018 try {
1043- def wspId = workspaceId ? Long . valueOf(workspaceId) : null
1044- def header = Boolean . TRUE // Workflow output index files always have headers
1019+ def url = getUrlDatasetUpload(datasetId)
1020+ // Workflow output index files always have headers
1021+ url + = " ?header=true"
10451022
1046- datasetsApi. uploadDataset(wspId, datasetId, header, indexPath. toFile())
1023+ // Upload file using multipart form data
1024+ final resp = uploadFile(url, indexPath. toFile())
10471025
1048- log. info " Successfully uploaded index file for output '${ outputName} ' to dataset ${ datasetId} "
1049- }
1050- catch ( ApiException e ) {
1051- log. warn " Failed to upload index file for output '${ outputName} ': ${ e.message} (status: ${ e.code} )" , e
1026+ if ( resp. isError() ) {
1027+ log. warn " Failed to upload index file for output '${ outputName} ': ${ resp.message} "
1028+ } else {
1029+ log. info " Successfully uploaded index file for output '${ outputName} ' to dataset ${ datasetId} "
1030+ }
10521031 }
10531032 catch ( Exception e ) {
10541033 log. warn " Failed to upload index file for output '${ outputName} ': ${ e.message} " , e
10551034 }
10561035 }
10571036
1037+ /**
1038+ * Upload a file to Seqera Platform using multipart/form-data
1039+ *
1040+ * @param url The upload URL
1041+ * @param file The file to upload
1042+ * @return Response object
1043+ */
1044+ protected Response uploadFile (String url , File file ) {
1045+ log. trace " HTTP multipart upload: url=$url ; file=${ file.name} "
1046+
1047+ try {
1048+ // Create multipart body
1049+ final boundary = " ----TowerNextflowBoundary" + System . currentTimeMillis()
1050+ final body = createMultipartBody(file, boundary)
1051+
1052+ // Build request
1053+ final request = HttpRequest . newBuilder(URI . create(url))
1054+ .header(' Content-Type' , " multipart/form-data; boundary=$boundary " )
1055+ .header(' User-Agent' , " Nextflow/$BuildInfo . version " )
1056+ .header(' Traceparent' , TraceUtils . rndTrace())
1057+ .POST (HttpRequest.BodyPublishers . ofByteArray(body))
1058+ .build()
1059+
1060+ final resp = httpClient. sendAsString(request)
1061+ final status = resp. statusCode()
1062+
1063+ if ( status == 401 ) {
1064+ return new Response (status, ' Unauthorized Seqera Platform API access' )
1065+ }
1066+ if ( status >= 400 ) {
1067+ final msg = parseCause(resp?. body()) ?: " Unexpected response for request $url "
1068+ return new Response (status, msg as String )
1069+ }
1070+
1071+ return new Response (status, resp. body())
1072+ }
1073+ catch ( IOException e ) {
1074+ return new Response (0 , " Unable to connect to Seqera Platform API: ${ getHostUrl(url)} " )
1075+ }
1076+ }
1077+
1078+ /**
1079+ * Create a multipart/form-data request body
1080+ *
1081+ * @param file The file to include in the request
1082+ * @param boundary The multipart boundary string
1083+ * @return Byte array containing the multipart body
1084+ */
1085+ private byte [] createMultipartBody (File file , String boundary ) {
1086+ final baos = new ByteArrayOutputStream ()
1087+ final writer = new PrintWriter (new OutputStreamWriter (baos, ' UTF-8' ), true )
1088+
1089+ // Write file part
1090+ writer. append(" --${ boundary} \r\n " )
1091+ writer. append(" Content-Disposition: form-data; name=\" file\" ; filename=\" ${ file.name} \"\r\n " )
1092+ writer. append(" Content-Type: text/csv\r\n " )
1093+ writer. append(" \r\n " )
1094+ writer. flush()
1095+
1096+ // Write file content
1097+ baos. write(file. bytes)
1098+
1099+ // Write closing boundary
1100+ writer. append(" \r\n " )
1101+ writer. append(" --${ boundary} --\r\n " )
1102+ writer. flush()
1103+
1104+ return baos. toByteArray()
1105+ }
1106+
10581107}
0 commit comments