Skip to content

Commit 0c33f72

Browse files
authored
feat: add hudi flink source split POJOs (#17483)
1 parent 5c817c3 commit 0c33f72

File tree

5 files changed

+295
-0
lines changed

5 files changed

+295
-0
lines changed
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.source.split;
20+
21+
import org.apache.hudi.common.util.Option;
22+
23+
import org.apache.flink.api.connector.source.SourceSplit;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.io.Serializable;
28+
import java.util.List;
29+
30+
/**
31+
* Hoodie SourceSplit implementation for source V2.
32+
*/
33+
public class HoodieSourceSplit implements SourceSplit, Serializable {
34+
private static final long serialVersionUID = 1L;
35+
36+
private static final long NUM_NO_CONSUMPTION = 0L;
37+
38+
// the split number
39+
private final int splitNum;
40+
// the base file of a file slice
41+
private final Option<String> basePath;
42+
// change log files of a file slice
43+
private final Option<List<String>> logPaths;
44+
// the base table path
45+
private final String tablePath;
46+
// source merge type
47+
private final String mergeType;
48+
// file id of file splice
49+
protected String fileId;
50+
51+
// for streaming reader to record the consumed offset,
52+
// which is the start of next round reading.
53+
private long consumed = NUM_NO_CONSUMPTION;
54+
55+
// for failure recovering
56+
private int fileOffset;
57+
private long recordOffset;
58+
59+
public HoodieSourceSplit(
60+
int splitNum,
61+
@Nullable String basePath,
62+
Option<List<String>> logPaths,
63+
String tablePath,
64+
String mergeType,
65+
String fileId) {
66+
this.splitNum = splitNum;
67+
this.basePath = Option.ofNullable(basePath);
68+
this.logPaths = logPaths;
69+
this.tablePath = tablePath;
70+
this.mergeType = mergeType;
71+
this.fileId = fileId;
72+
this.fileOffset = 0;
73+
this.recordOffset = 0L;
74+
}
75+
76+
@Override
77+
public String splitId() {
78+
return toString();
79+
}
80+
81+
public String getFileId() {
82+
return fileId;
83+
}
84+
85+
public void setFileId(String fileId) {
86+
this.fileId = fileId;
87+
}
88+
89+
public Option<String> getBasePath() {
90+
return basePath;
91+
}
92+
93+
public Option<List<String>> getLogPaths() {
94+
return logPaths;
95+
}
96+
97+
public String getTablePath() {
98+
return tablePath;
99+
}
100+
101+
public String getMergeType() {
102+
return mergeType;
103+
}
104+
105+
public void consume() {
106+
this.consumed += 1L;
107+
}
108+
109+
public long getConsumed() {
110+
return consumed;
111+
}
112+
113+
public boolean isConsumed() {
114+
return this.consumed != NUM_NO_CONSUMPTION;
115+
}
116+
117+
public int getFileOffset() {
118+
return fileOffset;
119+
}
120+
121+
public long getRecordOffset() {
122+
return recordOffset;
123+
}
124+
125+
public void updatePosition(int newFileOffset, long newRecordOffset) {
126+
fileOffset = newFileOffset;
127+
recordOffset = newRecordOffset;
128+
}
129+
130+
@Override
131+
public String toString() {
132+
return "HoodieSourceSplit{"
133+
+ "splitNum=" + splitNum
134+
+ ", basePath=" + basePath
135+
+ ", logPaths=" + logPaths
136+
+ ", tablePath='" + tablePath + '\''
137+
+ ", mergeType='" + mergeType + '\''
138+
+ '}';
139+
}
140+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.source.split;
20+
21+
/**
22+
* Hoodie source split state that will be snapshot to checkpoint within enumerator.
23+
*/
24+
public class HoodieSourceSplitState {
25+
private final HoodieSourceSplit split;
26+
private final HoodieSourceSplitStatus status;
27+
28+
public HoodieSourceSplitState(HoodieSourceSplit split, HoodieSourceSplitStatus status) {
29+
this.split = split;
30+
this.status = status;
31+
}
32+
33+
public HoodieSourceSplit split() {
34+
return split;
35+
}
36+
37+
public HoodieSourceSplitStatus status() {
38+
return status;
39+
}
40+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.source.split;
20+
21+
/**
22+
* Hoodie source split status.
23+
*/
24+
public enum HoodieSourceSplitStatus {
25+
// Created but not assigned
26+
UNASSIGNED,
27+
// Assigned to reader
28+
ASSIGNED,
29+
// Completed processing
30+
COMPLETED
31+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.source.split;
20+
21+
import java.io.Serializable;
22+
import java.util.Comparator;
23+
24+
/**
25+
* Comparator interface for HoodieSourceSplit.
26+
*/
27+
public interface SerializableComparator<T> extends Comparator<T>, Serializable {}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.source.split;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.connector.source.SourceEvent;
23+
24+
import java.util.Collection;
25+
import java.util.Collections;
26+
27+
/**
28+
* SourceEvent for hoodie split request passed between the SourceReaders and Enumerators.
29+
*/
30+
@Internal
31+
public class SplitRequestEvent implements SourceEvent {
32+
private static final long serialVersionUID = 1L;
33+
34+
private final Collection<String> finishedSplitIds;
35+
private final String requesterHostname;
36+
37+
public SplitRequestEvent() {
38+
this(Collections.emptyList());
39+
}
40+
41+
public SplitRequestEvent(Collection<String> finishedSplitIds) {
42+
this(finishedSplitIds, null);
43+
}
44+
45+
public SplitRequestEvent(Collection<String> finishedSplitIds, String requesterHostname) {
46+
this.finishedSplitIds = finishedSplitIds;
47+
this.requesterHostname = requesterHostname;
48+
}
49+
50+
public Collection<String> finishedSplitIds() {
51+
return finishedSplitIds;
52+
}
53+
54+
public String requesterHostname() {
55+
return requesterHostname;
56+
}
57+
}

0 commit comments

Comments
 (0)