1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.importer;
19
20 import lombok.RequiredArgsConstructor;
21 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
22 import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
23 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
24 import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
25 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
26 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
27 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
28 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
29
30 import java.util.List;
31
32
33
34
35 @RequiredArgsConstructor
36 public final class SingleChannelConsumerImporter extends AbstractPipelineLifecycleRunnable implements Importer {
37
38 private final PipelineChannel channel;
39
40 private final int batchSize;
41
42 private final long timeoutMillis;
43
44 private final PipelineSink sink;
45
46 private final PipelineJobProgressListener jobProgressListener;
47
48 @Override
49 protected void runBlocking() {
50 while (isRunning()) {
51 List<Record> records = channel.fetch(batchSize, timeoutMillis);
52 if (records.isEmpty()) {
53 continue;
54 }
55 PipelineJobProgressUpdatedParameter updatedParam = sink.write("", records);
56 channel.ack(records);
57 jobProgressListener.onProgressUpdated(updatedParam);
58 if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
59 break;
60 }
61 }
62 }
63
64 @Override
65 protected void doStop() {
66 QuietlyCloser.close(sink);
67 }
68 }