The CarbonData flink integration module is used to connect Flink and Carbon. The module provides a set of Flink BulkWriter implementations (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, and finally written into the stage directory of the target table by the CarbonXXXWriter.
By default, those data in table stage directory, can not be immediately queried, those data can be queried
after the INSERT INTO $tableName STAGE
command is executed.
Since the flink data written to carbon is endless, in order to ensure the visibility of data and the controllable amount of data processed during the execution of each insert form stage command, the user should execute the insert from stage command in a timely manner.
The execution interval of the insert form stage command should take the data visibility requirements of the actual business and the flink data traffic. When the data visibility requirements are high or the data traffic is large, the execution interval should be appropriately shortened.
A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon, for subsequent analysis and queries.
Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
Pseudo code and description:
// Import dependencies.
import java.util.Properties
import org.apache.carbon.flink.CarbonWriterFactory
import org.apache.carbon.flink.ProxyFileSystem
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
// Specify database name.
val databaseName = "default"
// Specify target table name.
val tableName = "test"
// Table path of the target table.
val tablePath = "/data/warehouse/test"
// Specify local temporary path.
val dataTempPath = "/data/temp/"
val tableProperties = new Properties
// Set the table properties here.
val writerProperties = new Properties
// Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc.
val carbonProperties = new Properties
// Set the carbon properties here, such as date format, store location, etc.
// Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'.
val writerFactory = CarbonWriterFactory.builder("Local").build(
databaseName,
tableName,
tablePath,
tableProperties,
writerProperties,
carbonProperties
)
// Build a flink stream and run it.
// 1. Create a new flink execution environment.
val environment = StreamExecutionEnvironment.getExecutionEnvironment
// Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc.
// 2. Create flink data source, may be a kafka source, custom source, or others.
// The data type of source should be Array[AnyRef].
// Array length should equals to table column count, and values order in array should matches table column order.
val source = ...
// 3. Create flink stream and set source.
val stream = environment.addSource(source)
// 4. Add other flink operators here.
// ...
// 5. Set flink stream target (write data to carbon with a write sink).
stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
// 6. Run flink stream.
try {
environment.execute
} catch {
case exception: Exception =>
// Handle execute exception here.
}
Property | Name | Description |
---|---|---|
CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path | Usually is a local path, data will write to temp path first, and move to target data path finally. |
CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
Property | Name | Description |
---|---|---|
CarbonS3Property.ACCESS_KEY | carbon.writer.s3.access.key | Access key of s3 file system |
CarbonS3Property.SECRET_KEY | carbon.writer.s3.secret.key | Secret key of s3 file system |
CarbonS3Property.ENDPOINT | carbon.writer.s3.endpoint | Endpoint of s3 file system |
CarbonS3Property.DATA_TEMP_PATH | carbon.writer.s3.data.temp.path | Usually is a local path, data will write to temp path first, and move to target data path finally. |
CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
Refer Grammar Description for syntax.
Create target table.
CREATE TABLE test (col1 string, col2 string, col3 string) STORED AS carbondata
Writing flink data to local carbon table.
import java.util.Properties
import org.apache.carbon.flink.CarbonLocalProperty
import org.apache.carbon.flink.CarbonWriterFactory
import org.apache.carbon.flink.ProxyFileSystem
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.source.SourceFunction
val databaseName = "default"
val tableName = "test"
val tablePath = "/data/warehouse/test"
val dataTempPath = "/data/temp/"
val tableProperties = new Properties
val writerProperties = new Properties
writerProperties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
val carbonProperties = new Properties
carbonProperties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
carbonProperties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
val writerFactory = CarbonWriterFactory.builder("Local").build(
databaseName,
tableName,
tablePath,
tableProperties,
writerProperties,
carbonProperties
)
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart)
// Define a custom source.
val source = new SourceFunction[Array[AnyRef]]() {
override
def run(sourceContext: SourceFunction.SourceContext[Array[AnyRef]]): Unit = {
// Array length should equals to table column count, and values order in array should matches table column order.
val data = new Array[AnyRef](3)
data(0) = "value1"
data(1) = "value2"
data(2) = "value3"
sourceContext.collect(data)
}
override
def cancel(): Unit = {
// do something.
}
}
val stream = environment.addSource(source)
val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build
stream.addSink(streamSink)
try {
environment.execute
} catch {
case exception: Exception =>
// TODO
throw new UnsupportedOperationException(exception)
}
Insert into table from stage directory.
INSERT INTO test STAGE