3232import java .util .concurrent .Future ;
3333import java .util .concurrent .ScheduledExecutorService ;
3434import java .util .concurrent .TimeUnit ;
35+ import java .util .function .BiFunction ;
3536import java .util .function .Function ;
3637import java .util .function .Supplier ;
3738import org .apache .hadoop .conf .Configuration ;
@@ -98,7 +99,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
9899 OAuth2Properties .SAML1_TOKEN_TYPE );
99100
100101 private final Function <Map <String , String >, RESTClient > clientBuilder ;
101- private Function < Map <String , String >, FileIO > ioBuilder = null ;
102+ private final BiFunction < SessionContext , Map <String , String >, FileIO > ioBuilder ;
102103 private Cache <String , AuthSession > sessions = null ;
103104 private AuthSession catalogAuth = null ;
104105 private boolean keepTokenRefreshed = true ;
@@ -123,11 +124,15 @@ Map<String, String> params() {
123124 }
124125
125126 public RESTSessionCatalog () {
126- this (config -> HTTPClient .builder (config ).uri (config .get (CatalogProperties .URI )).build ());
127+ this (config -> HTTPClient .builder (config ).uri (config .get (CatalogProperties .URI )).build (), null );
127128 }
128129
129- RESTSessionCatalog (Function <Map <String , String >, RESTClient > clientBuilder ) {
130+ public RESTSessionCatalog (
131+ Function <Map <String , String >, RESTClient > clientBuilder ,
132+ BiFunction <SessionContext , Map <String , String >, FileIO > ioBuilder ) {
133+ Preconditions .checkNotNull (clientBuilder , "Invalid client builder: null" );
130134 this .clientBuilder = clientBuilder ;
135+ this .ioBuilder = ioBuilder ;
131136 }
132137
133138 @ Override
@@ -188,7 +193,7 @@ public void initialize(String name, Map<String, String> unresolved) {
188193 client , tokenRefreshExecutor (), token , expiresAtMillis (mergedProps ), catalogAuth );
189194 }
190195
191- this .io = newFileIO (mergedProps );
196+ this .io = newFileIO (SessionContext . createEmpty (), mergedProps );
192197
193198 this .snapshotMode =
194199 SnapshotMode .valueOf (
@@ -203,11 +208,6 @@ public void initialize(String name, Map<String, String> unresolved) {
203208 super .initialize (name , mergedProps );
204209 }
205210
206- public void setFileIOBuilder (Function <Map <String , String >, FileIO > newIOBuilder ) {
207- Preconditions .checkState (null == io , "Cannot set IO builder after calling initialize" );
208- this .ioBuilder = newIOBuilder ;
209- }
210-
211211 private AuthSession session (SessionContext context ) {
212212 AuthSession session =
213213 sessions .get (
@@ -350,7 +350,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
350350 client ,
351351 paths .table (loadedIdent ),
352352 session ::headers ,
353- tableFileIO (response .config ()),
353+ tableFileIO (context , response .config ()),
354354 tableMetadata );
355355
356356 TableIdentifier tableIdentifier = loadedIdent ;
@@ -605,7 +605,7 @@ public Table create() {
605605 client ,
606606 paths .table (ident ),
607607 session ::headers ,
608- tableFileIO (response .config ()),
608+ tableFileIO (context , response .config ()),
609609 response .tableMetadata ());
610610
611611 return new BaseTable (ops , fullTableName (ident ));
@@ -624,7 +624,7 @@ public Transaction createTransaction() {
624624 client ,
625625 paths .table (ident ),
626626 session ::headers ,
627- tableFileIO (response .config ()),
627+ tableFileIO (context , response .config ()),
628628 RESTTableOperations .UpdateType .CREATE ,
629629 createChanges (meta ),
630630 meta );
@@ -675,7 +675,7 @@ public Transaction replaceTransaction() {
675675 client ,
676676 paths .table (ident ),
677677 session ::headers ,
678- tableFileIO (response .config ()),
678+ tableFileIO (context , response .config ()),
679679 RESTTableOperations .UpdateType .REPLACE ,
680680 changes .build (),
681681 base );
@@ -765,24 +765,24 @@ private String fullTableName(TableIdentifier ident) {
765765 return String .format ("%s.%s" , name (), ident );
766766 }
767767
768- private FileIO newFileIO (Map <String , String > properties ) {
768+ private FileIO newFileIO (SessionContext context , Map <String , String > properties ) {
769769 if (null != ioBuilder ) {
770- return ioBuilder .apply (properties );
770+ return ioBuilder .apply (context , properties );
771771 } else {
772772 String ioImpl =
773773 properties .getOrDefault (CatalogProperties .FILE_IO_IMPL , ResolvingFileIO .class .getName ());
774774 return CatalogUtil .loadFileIO (ioImpl , properties , conf );
775775 }
776776 }
777777
778- private FileIO tableFileIO (Map <String , String > config ) {
779- if (config .isEmpty ()) {
778+ private FileIO tableFileIO (SessionContext context , Map <String , String > config ) {
779+ if (config .isEmpty () && ioBuilder == null ) {
780780 return io ; // reuse client and io since config is the same
781781 }
782782
783783 Map <String , String > fullConf = RESTUtil .merge (properties (), config );
784784
785- return newFileIO (fullConf );
785+ return newFileIO (context , fullConf );
786786 }
787787
788788 private AuthSession tableSession (Map <String , String > tableConf , AuthSession parent ) {
0 commit comments