View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.sqlfederation.resultset;
19  
20  import com.cedarsoftware.util.CaseInsensitiveMap;
21  import org.apache.calcite.linq4j.Enumerator;
22  import org.apache.calcite.rel.type.RelDataType;
23  import org.apache.calcite.schema.Schema;
24  import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
25  import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
26  import org.apache.shardingsphere.infra.binder.context.segment.select.projection.Projection;
27  import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
28  import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtils;
29  import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
30  import org.apache.shardingsphere.sqlfederation.resultset.converter.DialectSQLFederationColumnTypeConverter;
31  
32  import java.io.InputStream;
33  import java.io.Reader;
34  import java.math.BigDecimal;
35  import java.net.URL;
36  import java.sql.Array;
37  import java.sql.Blob;
38  import java.sql.Clob;
39  import java.sql.Date;
40  import java.sql.ResultSetMetaData;
41  import java.sql.SQLException;
42  import java.sql.SQLFeatureNotSupportedException;
43  import java.sql.SQLWarning;
44  import java.sql.SQLXML;
45  import java.sql.Statement;
46  import java.sql.Time;
47  import java.sql.Timestamp;
48  import java.util.Arrays;
49  import java.util.Calendar;
50  import java.util.Collection;
51  import java.util.HashSet;
52  import java.util.List;
53  import java.util.Map;
54  
55  /**
56   * SQL federation result set.
57   */
58  public final class SQLFederationResultSet extends AbstractUnsupportedOperationSQLFederationResultSet {
59      
60      private static final String ASCII = "Ascii";
61      
62      private static final String UNICODE = "Unicode";
63      
64      private static final String BINARY = "Binary";
65      
66      private static final Collection<Class<?>> INVALID_FEDERATION_TYPES = new HashSet<>(Arrays.asList(Blob.class, Clob.class, Reader.class, InputStream.class, SQLXML.class));
67      
68      private final ProcessEngine processEngine = new ProcessEngine();
69      
70      private final Enumerator<?> enumerator;
71      
72      private final Map<String, Integer> columnLabelAndIndexes;
73      
74      private final SQLFederationResultSetMetaData resultSetMetaData;
75      
76      private final DialectSQLFederationColumnTypeConverter columnTypeConverter;
77      
78      private final String processId;
79      
80      private Object[] currentRows;
81      
82      private boolean wasNull;
83      
84      private boolean closed;
85      
86      public SQLFederationResultSet(final Enumerator<?> enumerator, final Schema sqlFederationSchema, final List<Projection> expandProjections, final DatabaseType databaseType,
87                                    final RelDataType resultColumnType, final String processId) {
88          this.enumerator = enumerator;
89          this.processId = processId;
90          columnTypeConverter = DatabaseTypedSPILoader.findService(DialectSQLFederationColumnTypeConverter.class, databaseType).orElse(null);
91          columnLabelAndIndexes = new CaseInsensitiveMap<>(expandProjections.size(), 1F);
92          Map<Integer, String> indexAndColumnLabels = new CaseInsensitiveMap<>(expandProjections.size(), 1F);
93          handleColumnLabelAndIndex(columnLabelAndIndexes, indexAndColumnLabels, expandProjections);
94          resultSetMetaData = new SQLFederationResultSetMetaData(sqlFederationSchema, expandProjections, databaseType, resultColumnType, indexAndColumnLabels, columnTypeConverter);
95      }
96      
97      private void handleColumnLabelAndIndex(final Map<String, Integer> columnLabelAndIndexes, final Map<Integer, String> indexAndColumnLabels, final List<Projection> expandProjections) {
98          for (int columnIndex = 1; columnIndex <= expandProjections.size(); columnIndex++) {
99              Projection projection = expandProjections.get(columnIndex - 1);
100             String columnLabel = projection.getColumnLabel();
101             columnLabelAndIndexes.putIfAbsent(columnLabel, columnIndex);
102             indexAndColumnLabels.putIfAbsent(columnIndex, columnLabel);
103         }
104     }
105     
106     @Override
107     public boolean next() {
108         try {
109             return next0();
110             // CHECKSTYLE:OFF
111         } catch (final Exception ex) {
112             // CHECKSTYLE:ON
113             close();
114             throw ex;
115         }
116     }
117     
118     private boolean next0() {
119         boolean result = enumerator.moveNext();
120         if (result) {
121             Object current = enumerator.current();
122             currentRows = null == current ? new Object[]{null} : getCurrentRows(current);
123         } else {
124             currentRows = new Object[]{null};
125             processEngine.completeSQLExecution(processId);
126         }
127         return result;
128     }
129     
130     private Object[] getCurrentRows(final Object current) {
131         return current.getClass().isArray() && !(current instanceof byte[]) ? (Object[]) current : new Object[]{current};
132     }
133     
134     @Override
135     public void close() {
136         closed = true;
137         currentRows = null;
138         try {
139             enumerator.close();
140         } finally {
141             processEngine.completeSQLExecution(processId);
142         }
143     }
144     
145     @Override
146     public boolean wasNull() {
147         return wasNull;
148     }
149     
150     @Override
151     public String getString(final int columnIndex) throws SQLException {
152         return (String) ResultSetUtils.convertValue(getValue(columnIndex, String.class), String.class);
153     }
154     
155     @Override
156     public String getString(final String columnLabel) throws SQLException {
157         return getString(getIndexFromColumnLabelAndIndexMap(columnLabel));
158     }
159     
160     @Override
161     public boolean getBoolean(final int columnIndex) throws SQLException {
162         return (boolean) ResultSetUtils.convertValue(getValue(columnIndex, boolean.class), boolean.class);
163     }
164     
165     @Override
166     public boolean getBoolean(final String columnLabel) throws SQLException {
167         return getBoolean(getIndexFromColumnLabelAndIndexMap(columnLabel));
168     }
169     
170     @Override
171     public byte getByte(final int columnIndex) throws SQLException {
172         return (byte) ResultSetUtils.convertValue(getValue(columnIndex, byte.class), byte.class);
173     }
174     
175     @Override
176     public byte getByte(final String columnLabel) throws SQLException {
177         return getByte(getIndexFromColumnLabelAndIndexMap(columnLabel));
178     }
179     
180     @Override
181     public short getShort(final int columnIndex) throws SQLException {
182         return (short) ResultSetUtils.convertValue(getValue(columnIndex, short.class), short.class);
183     }
184     
185     @Override
186     public short getShort(final String columnLabel) throws SQLException {
187         return getShort(getIndexFromColumnLabelAndIndexMap(columnLabel));
188     }
189     
190     @Override
191     public int getInt(final int columnIndex) throws SQLException {
192         return (int) ResultSetUtils.convertValue(getValue(columnIndex, int.class), int.class);
193     }
194     
195     @Override
196     public int getInt(final String columnLabel) throws SQLException {
197         return getInt(getIndexFromColumnLabelAndIndexMap(columnLabel));
198     }
199     
200     @Override
201     public long getLong(final int columnIndex) throws SQLException {
202         return (long) ResultSetUtils.convertValue(getValue(columnIndex, long.class), long.class);
203     }
204     
205     @Override
206     public long getLong(final String columnLabel) throws SQLException {
207         return getLong(getIndexFromColumnLabelAndIndexMap(columnLabel));
208     }
209     
210     @Override
211     public float getFloat(final int columnIndex) throws SQLException {
212         return (float) ResultSetUtils.convertValue(getValue(columnIndex, float.class), float.class);
213     }
214     
215     @Override
216     public float getFloat(final String columnLabel) throws SQLException {
217         return getFloat(getIndexFromColumnLabelAndIndexMap(columnLabel));
218     }
219     
220     @Override
221     public double getDouble(final int columnIndex) throws SQLException {
222         return (double) ResultSetUtils.convertValue(getValue(columnIndex, double.class), double.class);
223     }
224     
225     @Override
226     public double getDouble(final String columnLabel) throws SQLException {
227         return getDouble(getIndexFromColumnLabelAndIndexMap(columnLabel));
228     }
229     
230     @Override
231     public BigDecimal getBigDecimal(final int columnIndex, final int scale) throws SQLException {
232         return (BigDecimal) ResultSetUtils.convertValue(getValue(columnIndex, BigDecimal.class), BigDecimal.class);
233     }
234     
235     @Override
236     public BigDecimal getBigDecimal(final String columnLabel, final int scale) throws SQLException {
237         return getBigDecimal(getIndexFromColumnLabelAndIndexMap(columnLabel));
238     }
239     
240     @Override
241     public BigDecimal getBigDecimal(final int columnIndex) throws SQLException {
242         return (BigDecimal) ResultSetUtils.convertValue(getValue(columnIndex, BigDecimal.class), BigDecimal.class);
243     }
244     
245     @Override
246     public BigDecimal getBigDecimal(final String columnLabel) throws SQLException {
247         return getBigDecimal(getIndexFromColumnLabelAndIndexMap(columnLabel));
248     }
249     
250     @Override
251     public byte[] getBytes(final int columnIndex) throws SQLException {
252         return (byte[]) ResultSetUtils.convertValue(getValue(columnIndex, byte[].class), byte[].class);
253     }
254     
255     @Override
256     public byte[] getBytes(final String columnLabel) throws SQLException {
257         return getBytes(getIndexFromColumnLabelAndIndexMap(columnLabel));
258     }
259     
260     @Override
261     public Date getDate(final int columnIndex) throws SQLException {
262         return (Date) ResultSetUtils.convertValue(getValue(columnIndex, Date.class), Date.class);
263     }
264     
265     @Override
266     public Date getDate(final String columnLabel) throws SQLException {
267         return getDate(getIndexFromColumnLabelAndIndexMap(columnLabel));
268     }
269     
270     @Override
271     public Date getDate(final int columnIndex, final Calendar cal) throws SQLException {
272         return (Date) ResultSetUtils.convertValue(getCalendarValue(columnIndex), Date.class);
273     }
274     
275     @Override
276     public Date getDate(final String columnLabel, final Calendar cal) throws SQLException {
277         return getDate(getIndexFromColumnLabelAndIndexMap(columnLabel), cal);
278     }
279     
280     @Override
281     public Time getTime(final int columnIndex) throws SQLException {
282         return (Time) ResultSetUtils.convertValue(getValue(columnIndex, Time.class), Time.class);
283     }
284     
285     @Override
286     public Time getTime(final String columnLabel) throws SQLException {
287         return getTime(getIndexFromColumnLabelAndIndexMap(columnLabel));
288     }
289     
290     @Override
291     public Time getTime(final int columnIndex, final Calendar cal) throws SQLException {
292         return (Time) ResultSetUtils.convertValue(getCalendarValue(columnIndex), Time.class);
293     }
294     
295     @Override
296     public Time getTime(final String columnLabel, final Calendar cal) throws SQLException {
297         return getTime(getIndexFromColumnLabelAndIndexMap(columnLabel), cal);
298     }
299     
300     @Override
301     public Timestamp getTimestamp(final int columnIndex) throws SQLException {
302         return (Timestamp) ResultSetUtils.convertValue(getValue(columnIndex, Timestamp.class), Timestamp.class);
303     }
304     
305     @Override
306     public Timestamp getTimestamp(final String columnLabel) throws SQLException {
307         return getTimestamp(getIndexFromColumnLabelAndIndexMap(columnLabel));
308     }
309     
310     @Override
311     public Timestamp getTimestamp(final int columnIndex, final Calendar cal) throws SQLException {
312         return (Timestamp) ResultSetUtils.convertValue(getCalendarValue(columnIndex), Timestamp.class);
313     }
314     
315     @Override
316     public Timestamp getTimestamp(final String columnLabel, final Calendar cal) throws SQLException {
317         return getTimestamp(getIndexFromColumnLabelAndIndexMap(columnLabel), cal);
318     }
319     
320     @Override
321     public InputStream getAsciiStream(final int columnIndex) throws SQLException {
322         return getInputStream(ASCII);
323     }
324     
325     @Override
326     public InputStream getAsciiStream(final String columnLabel) throws SQLException {
327         return getAsciiStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
328     }
329     
330     @Override
331     public InputStream getUnicodeStream(final int columnIndex) throws SQLException {
332         return getInputStream(UNICODE);
333     }
334     
335     @Override
336     public InputStream getUnicodeStream(final String columnLabel) throws SQLException {
337         return getUnicodeStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
338     }
339     
340     @Override
341     public InputStream getBinaryStream(final int columnIndex) throws SQLException {
342         return getInputStream(BINARY);
343     }
344     
345     @Override
346     public InputStream getBinaryStream(final String columnLabel) throws SQLException {
347         return getBinaryStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
348     }
349     
350     @Override
351     public SQLWarning getWarnings() {
352         return null;
353     }
354     
355     @Override
356     public void clearWarnings() {
357     }
358     
359     @Override
360     public ResultSetMetaData getMetaData() {
361         return resultSetMetaData;
362     }
363     
364     @Override
365     public Object getObject(final int columnIndex) throws SQLException {
366         return getValue(columnIndex, Object.class);
367     }
368     
369     @Override
370     public Object getObject(final String columnLabel) throws SQLException {
371         return getObject(getIndexFromColumnLabelAndIndexMap(columnLabel));
372     }
373     
374     @Override
375     public int findColumn(final String columnLabel) throws SQLException {
376         return getIndexFromColumnLabelAndIndexMap(columnLabel);
377     }
378     
379     @Override
380     public Reader getCharacterStream(final int columnIndex) throws SQLException {
381         return (Reader) getValue(columnIndex, Reader.class);
382     }
383     
384     @Override
385     public Reader getCharacterStream(final String columnLabel) throws SQLException {
386         return getCharacterStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
387     }
388     
389     @Override
390     public void setFetchDirection(final int direction) {
391     }
392     
393     @Override
394     public int getFetchDirection() {
395         return FETCH_FORWARD;
396     }
397     
398     @Override
399     public void setFetchSize(final int rows) {
400     }
401     
402     @Override
403     public int getFetchSize() {
404         return 0;
405     }
406     
407     @Override
408     public int getType() {
409         return TYPE_FORWARD_ONLY;
410     }
411     
412     @Override
413     public int getConcurrency() {
414         return CONCUR_READ_ONLY;
415     }
416     
417     @Override
418     public Statement getStatement() {
419         return null;
420     }
421     
422     @Override
423     public Blob getBlob(final int columnIndex) throws SQLException {
424         return (Blob) getValue(columnIndex, Blob.class);
425     }
426     
427     @Override
428     public Blob getBlob(final String columnLabel) throws SQLException {
429         return getBlob(getIndexFromColumnLabelAndIndexMap(columnLabel));
430     }
431     
432     @Override
433     public Clob getClob(final int columnIndex) throws SQLException {
434         return (Clob) getValue(columnIndex, Clob.class);
435     }
436     
437     @Override
438     public Clob getClob(final String columnLabel) throws SQLException {
439         return getClob(getIndexFromColumnLabelAndIndexMap(columnLabel));
440     }
441     
442     @Override
443     public Array getArray(final int columnIndex) throws SQLException {
444         return (Array) getValue(columnIndex, Array.class);
445     }
446     
447     @Override
448     public Array getArray(final String columnLabel) throws SQLException {
449         return getArray(getIndexFromColumnLabelAndIndexMap(columnLabel));
450     }
451     
452     @Override
453     public URL getURL(final int columnIndex) throws SQLException {
454         return (URL) getValue(columnIndex, URL.class);
455     }
456     
457     @Override
458     public URL getURL(final String columnLabel) throws SQLException {
459         return getURL(getIndexFromColumnLabelAndIndexMap(columnLabel));
460     }
461     
462     @Override
463     public boolean isClosed() {
464         return closed;
465     }
466     
467     @Override
468     public SQLXML getSQLXML(final int columnIndex) throws SQLException {
469         return (SQLXML) getValue(columnIndex, SQLXML.class);
470     }
471     
472     @Override
473     public SQLXML getSQLXML(final String columnLabel) throws SQLException {
474         return getSQLXML(getIndexFromColumnLabelAndIndexMap(columnLabel));
475     }
476     
477     @Override
478     public String getNString(final int columnIndex) throws SQLException {
479         return getString(columnIndex);
480     }
481     
482     @Override
483     public String getNString(final String columnLabel) throws SQLException {
484         return getString(columnLabel);
485     }
486     
487     private Integer getIndexFromColumnLabelAndIndexMap(final String columnLabel) throws SQLException {
488         Integer result = columnLabelAndIndexes.get(columnLabel);
489         ShardingSpherePreconditions.checkNotNull(result, () -> new SQLFeatureNotSupportedException(String.format("can not get index from column label `%s`", columnLabel)));
490         return result;
491     }
492     
493     private Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
494         ShardingSpherePreconditions.checkNotContains(INVALID_FEDERATION_TYPES, type, () -> new SQLFeatureNotSupportedException(String.format("Get value from `%s`", type.getName())));
495         Object result = currentRows[columnIndex - 1];
496         wasNull = null == result;
497         return null == columnTypeConverter ? result : columnTypeConverter.convertColumnValue(result);
498     }
499     
500     private Object getCalendarValue(final int columnIndex) {
501         // TODO implement with calendar
502         Object result = currentRows[columnIndex - 1];
503         wasNull = null == result;
504         return result;
505     }
506     
507     private InputStream getInputStream(final String type) throws SQLException {
508         throw new SQLFeatureNotSupportedException(String.format("Get input stream from `%s`", type));
509     }
510 }