1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.sqlfederation.resultset;
19
20 import com.cedarsoftware.util.CaseInsensitiveMap;
21 import org.apache.calcite.linq4j.Enumerator;
22 import org.apache.calcite.rel.type.RelDataType;
23 import org.apache.calcite.schema.Schema;
24 import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
25 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
26 import org.apache.shardingsphere.infra.binder.context.segment.select.projection.Projection;
27 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
28 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtils;
29 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
30 import org.apache.shardingsphere.sqlfederation.resultset.converter.DialectSQLFederationColumnTypeConverter;
31
32 import java.io.InputStream;
33 import java.io.Reader;
34 import java.math.BigDecimal;
35 import java.net.URL;
36 import java.sql.Array;
37 import java.sql.Blob;
38 import java.sql.Clob;
39 import java.sql.Date;
40 import java.sql.ResultSetMetaData;
41 import java.sql.SQLException;
42 import java.sql.SQLFeatureNotSupportedException;
43 import java.sql.SQLWarning;
44 import java.sql.SQLXML;
45 import java.sql.Statement;
46 import java.sql.Time;
47 import java.sql.Timestamp;
48 import java.util.Arrays;
49 import java.util.Calendar;
50 import java.util.Collection;
51 import java.util.HashSet;
52 import java.util.List;
53 import java.util.Map;
54
55
56
57
58 public final class SQLFederationResultSet extends AbstractUnsupportedOperationSQLFederationResultSet {
59
60 private static final String ASCII = "Ascii";
61
62 private static final String UNICODE = "Unicode";
63
64 private static final String BINARY = "Binary";
65
66 private static final Collection<Class<?>> INVALID_FEDERATION_TYPES = new HashSet<>(Arrays.asList(Blob.class, Clob.class, Reader.class, InputStream.class, SQLXML.class));
67
68 private final ProcessEngine processEngine = new ProcessEngine();
69
70 private final Enumerator<?> enumerator;
71
72 private final Map<String, Integer> columnLabelAndIndexes;
73
74 private final SQLFederationResultSetMetaData resultSetMetaData;
75
76 private final DialectSQLFederationColumnTypeConverter columnTypeConverter;
77
78 private final String processId;
79
80 private Object[] currentRows;
81
82 private boolean wasNull;
83
84 private boolean closed;
85
86 public SQLFederationResultSet(final Enumerator<?> enumerator, final Schema sqlFederationSchema, final List<Projection> expandProjections, final DatabaseType databaseType,
87 final RelDataType resultColumnType, final String processId) {
88 this.enumerator = enumerator;
89 this.processId = processId;
90 columnTypeConverter = DatabaseTypedSPILoader.findService(DialectSQLFederationColumnTypeConverter.class, databaseType).orElse(null);
91 columnLabelAndIndexes = new CaseInsensitiveMap<>(expandProjections.size(), 1F);
92 Map<Integer, String> indexAndColumnLabels = new CaseInsensitiveMap<>(expandProjections.size(), 1F);
93 handleColumnLabelAndIndex(columnLabelAndIndexes, indexAndColumnLabels, expandProjections);
94 resultSetMetaData = new SQLFederationResultSetMetaData(sqlFederationSchema, expandProjections, databaseType, resultColumnType, indexAndColumnLabels, columnTypeConverter);
95 }
96
97 private void handleColumnLabelAndIndex(final Map<String, Integer> columnLabelAndIndexes, final Map<Integer, String> indexAndColumnLabels, final List<Projection> expandProjections) {
98 for (int columnIndex = 1; columnIndex <= expandProjections.size(); columnIndex++) {
99 Projection projection = expandProjections.get(columnIndex - 1);
100 String columnLabel = projection.getColumnLabel();
101 columnLabelAndIndexes.putIfAbsent(columnLabel, columnIndex);
102 indexAndColumnLabels.putIfAbsent(columnIndex, columnLabel);
103 }
104 }
105
106 @Override
107 public boolean next() {
108 try {
109 return next0();
110
111 } catch (final Exception ex) {
112
113 close();
114 throw ex;
115 }
116 }
117
118 private boolean next0() {
119 boolean result = enumerator.moveNext();
120 if (result) {
121 Object current = enumerator.current();
122 currentRows = null == current ? new Object[]{null} : getCurrentRows(current);
123 } else {
124 currentRows = new Object[]{null};
125 processEngine.completeSQLExecution(processId);
126 }
127 return result;
128 }
129
130 private Object[] getCurrentRows(final Object current) {
131 return current.getClass().isArray() && !(current instanceof byte[]) ? (Object[]) current : new Object[]{current};
132 }
133
134 @Override
135 public void close() {
136 closed = true;
137 currentRows = null;
138 try {
139 enumerator.close();
140 } finally {
141 processEngine.completeSQLExecution(processId);
142 }
143 }
144
145 @Override
146 public boolean wasNull() {
147 return wasNull;
148 }
149
150 @Override
151 public String getString(final int columnIndex) throws SQLException {
152 return (String) ResultSetUtils.convertValue(getValue(columnIndex, String.class), String.class);
153 }
154
155 @Override
156 public String getString(final String columnLabel) throws SQLException {
157 return getString(getIndexFromColumnLabelAndIndexMap(columnLabel));
158 }
159
160 @Override
161 public boolean getBoolean(final int columnIndex) throws SQLException {
162 return (boolean) ResultSetUtils.convertValue(getValue(columnIndex, boolean.class), boolean.class);
163 }
164
165 @Override
166 public boolean getBoolean(final String columnLabel) throws SQLException {
167 return getBoolean(getIndexFromColumnLabelAndIndexMap(columnLabel));
168 }
169
170 @Override
171 public byte getByte(final int columnIndex) throws SQLException {
172 return (byte) ResultSetUtils.convertValue(getValue(columnIndex, byte.class), byte.class);
173 }
174
175 @Override
176 public byte getByte(final String columnLabel) throws SQLException {
177 return getByte(getIndexFromColumnLabelAndIndexMap(columnLabel));
178 }
179
180 @Override
181 public short getShort(final int columnIndex) throws SQLException {
182 return (short) ResultSetUtils.convertValue(getValue(columnIndex, short.class), short.class);
183 }
184
185 @Override
186 public short getShort(final String columnLabel) throws SQLException {
187 return getShort(getIndexFromColumnLabelAndIndexMap(columnLabel));
188 }
189
190 @Override
191 public int getInt(final int columnIndex) throws SQLException {
192 return (int) ResultSetUtils.convertValue(getValue(columnIndex, int.class), int.class);
193 }
194
195 @Override
196 public int getInt(final String columnLabel) throws SQLException {
197 return getInt(getIndexFromColumnLabelAndIndexMap(columnLabel));
198 }
199
200 @Override
201 public long getLong(final int columnIndex) throws SQLException {
202 return (long) ResultSetUtils.convertValue(getValue(columnIndex, long.class), long.class);
203 }
204
205 @Override
206 public long getLong(final String columnLabel) throws SQLException {
207 return getLong(getIndexFromColumnLabelAndIndexMap(columnLabel));
208 }
209
210 @Override
211 public float getFloat(final int columnIndex) throws SQLException {
212 return (float) ResultSetUtils.convertValue(getValue(columnIndex, float.class), float.class);
213 }
214
215 @Override
216 public float getFloat(final String columnLabel) throws SQLException {
217 return getFloat(getIndexFromColumnLabelAndIndexMap(columnLabel));
218 }
219
220 @Override
221 public double getDouble(final int columnIndex) throws SQLException {
222 return (double) ResultSetUtils.convertValue(getValue(columnIndex, double.class), double.class);
223 }
224
225 @Override
226 public double getDouble(final String columnLabel) throws SQLException {
227 return getDouble(getIndexFromColumnLabelAndIndexMap(columnLabel));
228 }
229
230 @Override
231 public BigDecimal getBigDecimal(final int columnIndex, final int scale) throws SQLException {
232 return (BigDecimal) ResultSetUtils.convertValue(getValue(columnIndex, BigDecimal.class), BigDecimal.class);
233 }
234
235 @Override
236 public BigDecimal getBigDecimal(final String columnLabel, final int scale) throws SQLException {
237 return getBigDecimal(getIndexFromColumnLabelAndIndexMap(columnLabel));
238 }
239
240 @Override
241 public BigDecimal getBigDecimal(final int columnIndex) throws SQLException {
242 return (BigDecimal) ResultSetUtils.convertValue(getValue(columnIndex, BigDecimal.class), BigDecimal.class);
243 }
244
245 @Override
246 public BigDecimal getBigDecimal(final String columnLabel) throws SQLException {
247 return getBigDecimal(getIndexFromColumnLabelAndIndexMap(columnLabel));
248 }
249
250 @Override
251 public byte[] getBytes(final int columnIndex) throws SQLException {
252 return (byte[]) ResultSetUtils.convertValue(getValue(columnIndex, byte[].class), byte[].class);
253 }
254
255 @Override
256 public byte[] getBytes(final String columnLabel) throws SQLException {
257 return getBytes(getIndexFromColumnLabelAndIndexMap(columnLabel));
258 }
259
260 @Override
261 public Date getDate(final int columnIndex) throws SQLException {
262 return (Date) ResultSetUtils.convertValue(getValue(columnIndex, Date.class), Date.class);
263 }
264
265 @Override
266 public Date getDate(final String columnLabel) throws SQLException {
267 return getDate(getIndexFromColumnLabelAndIndexMap(columnLabel));
268 }
269
270 @Override
271 public Date getDate(final int columnIndex, final Calendar cal) throws SQLException {
272 return (Date) ResultSetUtils.convertValue(getCalendarValue(columnIndex), Date.class);
273 }
274
275 @Override
276 public Date getDate(final String columnLabel, final Calendar cal) throws SQLException {
277 return getDate(getIndexFromColumnLabelAndIndexMap(columnLabel), cal);
278 }
279
280 @Override
281 public Time getTime(final int columnIndex) throws SQLException {
282 return (Time) ResultSetUtils.convertValue(getValue(columnIndex, Time.class), Time.class);
283 }
284
285 @Override
286 public Time getTime(final String columnLabel) throws SQLException {
287 return getTime(getIndexFromColumnLabelAndIndexMap(columnLabel));
288 }
289
290 @Override
291 public Time getTime(final int columnIndex, final Calendar cal) throws SQLException {
292 return (Time) ResultSetUtils.convertValue(getCalendarValue(columnIndex), Time.class);
293 }
294
295 @Override
296 public Time getTime(final String columnLabel, final Calendar cal) throws SQLException {
297 return getTime(getIndexFromColumnLabelAndIndexMap(columnLabel), cal);
298 }
299
300 @Override
301 public Timestamp getTimestamp(final int columnIndex) throws SQLException {
302 return (Timestamp) ResultSetUtils.convertValue(getValue(columnIndex, Timestamp.class), Timestamp.class);
303 }
304
305 @Override
306 public Timestamp getTimestamp(final String columnLabel) throws SQLException {
307 return getTimestamp(getIndexFromColumnLabelAndIndexMap(columnLabel));
308 }
309
310 @Override
311 public Timestamp getTimestamp(final int columnIndex, final Calendar cal) throws SQLException {
312 return (Timestamp) ResultSetUtils.convertValue(getCalendarValue(columnIndex), Timestamp.class);
313 }
314
315 @Override
316 public Timestamp getTimestamp(final String columnLabel, final Calendar cal) throws SQLException {
317 return getTimestamp(getIndexFromColumnLabelAndIndexMap(columnLabel), cal);
318 }
319
320 @Override
321 public InputStream getAsciiStream(final int columnIndex) throws SQLException {
322 return getInputStream(ASCII);
323 }
324
325 @Override
326 public InputStream getAsciiStream(final String columnLabel) throws SQLException {
327 return getAsciiStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
328 }
329
330 @Override
331 public InputStream getUnicodeStream(final int columnIndex) throws SQLException {
332 return getInputStream(UNICODE);
333 }
334
335 @Override
336 public InputStream getUnicodeStream(final String columnLabel) throws SQLException {
337 return getUnicodeStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
338 }
339
340 @Override
341 public InputStream getBinaryStream(final int columnIndex) throws SQLException {
342 return getInputStream(BINARY);
343 }
344
345 @Override
346 public InputStream getBinaryStream(final String columnLabel) throws SQLException {
347 return getBinaryStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
348 }
349
350 @Override
351 public SQLWarning getWarnings() {
352 return null;
353 }
354
355 @Override
356 public void clearWarnings() {
357 }
358
359 @Override
360 public ResultSetMetaData getMetaData() {
361 return resultSetMetaData;
362 }
363
364 @Override
365 public Object getObject(final int columnIndex) throws SQLException {
366 return getValue(columnIndex, Object.class);
367 }
368
369 @Override
370 public Object getObject(final String columnLabel) throws SQLException {
371 return getObject(getIndexFromColumnLabelAndIndexMap(columnLabel));
372 }
373
374 @Override
375 public int findColumn(final String columnLabel) throws SQLException {
376 return getIndexFromColumnLabelAndIndexMap(columnLabel);
377 }
378
379 @Override
380 public Reader getCharacterStream(final int columnIndex) throws SQLException {
381 return (Reader) getValue(columnIndex, Reader.class);
382 }
383
384 @Override
385 public Reader getCharacterStream(final String columnLabel) throws SQLException {
386 return getCharacterStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
387 }
388
389 @Override
390 public void setFetchDirection(final int direction) {
391 }
392
393 @Override
394 public int getFetchDirection() {
395 return FETCH_FORWARD;
396 }
397
398 @Override
399 public void setFetchSize(final int rows) {
400 }
401
402 @Override
403 public int getFetchSize() {
404 return 0;
405 }
406
407 @Override
408 public int getType() {
409 return TYPE_FORWARD_ONLY;
410 }
411
412 @Override
413 public int getConcurrency() {
414 return CONCUR_READ_ONLY;
415 }
416
417 @Override
418 public Statement getStatement() {
419 return null;
420 }
421
422 @Override
423 public Blob getBlob(final int columnIndex) throws SQLException {
424 return (Blob) getValue(columnIndex, Blob.class);
425 }
426
427 @Override
428 public Blob getBlob(final String columnLabel) throws SQLException {
429 return getBlob(getIndexFromColumnLabelAndIndexMap(columnLabel));
430 }
431
432 @Override
433 public Clob getClob(final int columnIndex) throws SQLException {
434 return (Clob) getValue(columnIndex, Clob.class);
435 }
436
437 @Override
438 public Clob getClob(final String columnLabel) throws SQLException {
439 return getClob(getIndexFromColumnLabelAndIndexMap(columnLabel));
440 }
441
442 @Override
443 public Array getArray(final int columnIndex) throws SQLException {
444 return (Array) getValue(columnIndex, Array.class);
445 }
446
447 @Override
448 public Array getArray(final String columnLabel) throws SQLException {
449 return getArray(getIndexFromColumnLabelAndIndexMap(columnLabel));
450 }
451
452 @Override
453 public URL getURL(final int columnIndex) throws SQLException {
454 return (URL) getValue(columnIndex, URL.class);
455 }
456
457 @Override
458 public URL getURL(final String columnLabel) throws SQLException {
459 return getURL(getIndexFromColumnLabelAndIndexMap(columnLabel));
460 }
461
462 @Override
463 public boolean isClosed() {
464 return closed;
465 }
466
467 @Override
468 public SQLXML getSQLXML(final int columnIndex) throws SQLException {
469 return (SQLXML) getValue(columnIndex, SQLXML.class);
470 }
471
472 @Override
473 public SQLXML getSQLXML(final String columnLabel) throws SQLException {
474 return getSQLXML(getIndexFromColumnLabelAndIndexMap(columnLabel));
475 }
476
477 @Override
478 public String getNString(final int columnIndex) throws SQLException {
479 return getString(columnIndex);
480 }
481
482 @Override
483 public String getNString(final String columnLabel) throws SQLException {
484 return getString(columnLabel);
485 }
486
487 private Integer getIndexFromColumnLabelAndIndexMap(final String columnLabel) throws SQLException {
488 Integer result = columnLabelAndIndexes.get(columnLabel);
489 ShardingSpherePreconditions.checkNotNull(result, () -> new SQLFeatureNotSupportedException(String.format("can not get index from column label `%s`", columnLabel)));
490 return result;
491 }
492
493 private Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
494 ShardingSpherePreconditions.checkNotContains(INVALID_FEDERATION_TYPES, type, () -> new SQLFeatureNotSupportedException(String.format("Get value from `%s`", type.getName())));
495 Object result = currentRows[columnIndex - 1];
496 wasNull = null == result;
497 return null == columnTypeConverter ? result : columnTypeConverter.convertColumnValue(result);
498 }
499
500 private Object getCalendarValue(final int columnIndex) {
501
502 Object result = currentRows[columnIndex - 1];
503 wasNull = null == result;
504 return result;
505 }
506
507 private InputStream getInputStream(final String type) throws SQLException {
508 throw new SQLFeatureNotSupportedException(String.format("Get input stream from `%s`", type));
509 }
510 }