5151import org .apache .iceberg .mr .InputFormatConfig ;
5252import org .apache .iceberg .relocated .com .google .common .annotations .VisibleForTesting ;
5353import org .apache .iceberg .relocated .com .google .common .util .concurrent .ThreadFactoryBuilder ;
54- import org .apache .iceberg .util .Pair ;
5554import org .apache .iceberg .util .Tasks ;
5655import org .slf4j .Logger ;
5756import org .slf4j .LoggerFactory ;
@@ -94,7 +93,7 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
9493 TaskAttemptID attemptID = context .getTaskAttemptID ();
9594 JobConf jobConf = context .getJobConf ();
9695 Map <String , HiveIcebergRecordWriter > writers = HiveIcebergRecordWriter .getWriters (attemptID );
97- Collection <Pair < String , String > > outputs = HiveIcebergStorageHandler .outputTables (context .getJobConf ());
96+ Collection <String > outputs = HiveIcebergStorageHandler .outputTables (context .getJobConf ());
9897
9998 ExecutorService tableExecutor = tableExecutor (jobConf , outputs .size ());
10099 try {
@@ -105,8 +104,8 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
105104 .throwFailureWhenFinished ()
106105 .executeWith (tableExecutor )
107106 .run (output -> {
108- Table table = HiveIcebergStorageHandler .table (context .getJobConf (), output . second () );
109- HiveIcebergRecordWriter writer = writers .get (output . second () );
107+ Table table = HiveIcebergStorageHandler .table (context .getJobConf (), output );
108+ HiveIcebergRecordWriter writer = writers .get (output );
110109 DataFile [] closedFiles = writer != null ? writer .dataFiles () : new DataFile [0 ];
111110 String fileForCommitLocation = generateFileForCommitLocation (table .location (), jobConf ,
112111 attemptID .getJobID (), attemptID .getTaskID ().getId ());
@@ -158,7 +157,7 @@ public void commitJob(JobContext originalContext) throws IOException {
158157 long startTime = System .currentTimeMillis ();
159158 LOG .info ("Committing job {} has started" , jobContext .getJobID ());
160159
161- Collection <Pair < String , String > > outputs = HiveIcebergStorageHandler .outputTables (jobContext .getJobConf ());
160+ Collection <String > outputs = HiveIcebergStorageHandler .outputTables (jobContext .getJobConf ());
162161 Collection <String > jobLocations = new ConcurrentLinkedQueue <>();
163162
164163 ExecutorService fileExecutor = fileExecutor (jobConf );
@@ -170,9 +169,10 @@ public void commitJob(JobContext originalContext) throws IOException {
170169 .stopOnFailure ()
171170 .executeWith (tableExecutor )
172171 .run (output -> {
173- Table table = HiveIcebergStorageHandler .table (jobConf , output .second ());
172+ Table table = HiveIcebergStorageHandler .table (jobConf , output );
173+ String catalogName = HiveIcebergStorageHandler .catalogName (jobConf , output );
174174 jobLocations .add (generateJobLocation (table .location (), jobConf , jobContext .getJobID ()));
175- commitTable (table .io (), fileExecutor , jobContext , output . second () , table .location (), output . first () );
175+ commitTable (table .io (), fileExecutor , jobContext , output , table .location (), catalogName );
176176 });
177177 } finally {
178178 fileExecutor .shutdown ();
@@ -199,7 +199,7 @@ public void abortJob(JobContext originalContext, int status) throws IOException
199199 JobConf jobConf = jobContext .getJobConf ();
200200
201201 LOG .info ("Job {} is aborted. Data file cleaning started" , jobContext .getJobID ());
202- Collection <Pair < String , String > > outputs = HiveIcebergStorageHandler .outputTables (jobContext .getJobConf ());
202+ Collection <String > outputs = HiveIcebergStorageHandler .outputTables (jobContext .getJobConf ());
203203 Collection <String > jobLocations = new ConcurrentLinkedQueue <>();
204204
205205 ExecutorService fileExecutor = fileExecutor (jobConf );
@@ -213,7 +213,7 @@ public void abortJob(JobContext originalContext, int status) throws IOException
213213 .run (output -> {
214214 LOG .info ("Cleaning job for table {}" , jobContext .getJobID (), output );
215215
216- Table table = HiveIcebergStorageHandler .table (jobConf , output . second () );
216+ Table table = HiveIcebergStorageHandler .table (jobConf , output );
217217 jobLocations .add (generateJobLocation (table .location (), jobConf , jobContext .getJobID ()));
218218 Collection <DataFile > dataFiles = dataFiles (fileExecutor , table .location (), jobContext , table .io (), false );
219219
0 commit comments