CarbonData DML statements are documented here,which includes:
This command is used to load csv files to carbondata, OPTIONS are not mandatory for data loading process.
LOAD DATA INPATH 'folder_path'
INTO TABLE [db_name.]table_name
OPTIONS(property_name=property_value, ...)
NOTE: * Use 'file://' prefix to indicate local input files path, but it just supports local mode. * If run on cluster mode, please upload all input files to distributed file system, for example 'hdfs://' for hdfs.
Supported Properties:
Property | Description |
---|---|
DELIMITER | Character used to separate the data in the input csv file |
QUOTECHAR | Character used to quote the data in the input csv file |
LINE_SEPARATOR | Characters used to specify the line separator in the input csv file. If not provided, csv parser will detect it automatically. |
COMMENTCHAR | Character used to comment the rows in the input csv file. Those rows will be skipped from processing |
HEADER | Whether the input csv files have header row |
FILEHEADER | If header is not present in the input csv, what is the column names to be used for data read from input csv |
SORT_SCOPE | Sort Scope to be used for current load. |
MULTILINE | Whether a row data can span across multiple lines. |
ESCAPECHAR | Escape character used to excape the data in input csv file.For eg.,\ is a standard escape character |
SKIP_EMPTY_LINE | Whether empty lines in input csv file should be skipped or loaded as null row |
COMPLEX_DELIMITER_LEVEL_1 | Starting delimiter for complex type data in input csv file |
COMPLEX_DELIMITER_LEVEL_2 | Ending delimiter for complex type data in input csv file |
COMPLEX_DELIMITER_LEVEL_3 | Ending delimiter for nested complex type data in input csv file of level 3. |
DATEFORMAT | Format of date in the input csv file |
TIMESTAMPFORMAT | Format of timestamp in the input csv file |
SORT_COLUMN_BOUNDS | How to partition the sort columns to make the evenly distributed |
BAD_RECORDS_LOGGER_ENABLE | Whether to enable bad records logging |
BAD_RECORD_PATH | Bad records logging path. Useful when bad record logging is enabled |
BAD_RECORDS_ACTION | Behavior of data loading when bad record is found |
IS_EMPTY_DATA_BAD_RECORD | Whether empty data of a column to be considered as bad record or not |
GLOBAL_SORT_PARTITIONS | Number of partition to use for shuffling of data during sorting |
SCALE_FACTOR | Control the partition size for RANGE_COLUMN feature |
You can use the following options to load data:
Delimiters can be provided in the load command.
OPTIONS('DELIMITER'=',')
Quote Characters can be provided in the load command.
OPTIONS('QUOTECHAR'='"')
Line separator Characters can be provided in the load command.
OPTIONS('LINE_SEPARATOR'='\n')
Comment Characters can be provided in the load command if user wants to comment lines.
OPTIONS('COMMENTCHAR'='#')
When you load the CSV file without the file header and the file header is the same with the table schema, then add 'HEADER'='false' to load data SQL as user need not provide the file header. By default the value is 'true'. false: CSV file is without file header. true: CSV file is with file header.
OPTIONS('HEADER'='false')
NOTE: If the HEADER option exist and is set to 'true', then the FILEHEADER option is not required.
Headers can be provided in the LOAD DATA command if headers are missing in the source files.
OPTIONS('FILEHEADER'='column1,column2')
Sort Scope to be used for the current load. This overrides the Sort Scope of Table. Requirement: Sort Columns must be set while creating table. If Sort Columns is null, Sort Scope is always NO_SORT.
OPTIONS('SORT_SCOPE'='GLOBAL_SORT')
Priority order for choosing Sort Scope is:
CARBON.TABLE.LOAD.SORT.SCOPE.<db>.<table>
session property.CARBON.OPTIONS.SORT.SCOPE
session propertyCSV with new line character in quotes.
OPTIONS('MULTILINE'='true')
Escape char can be provided if user want strict validation of escape character in CSV files.
OPTIONS('ESCAPECHAR'='\')
This option will ignore the empty line in the CSV file during the data load.
OPTIONS('SKIP_EMPTY_LINE'='TRUE/FALSE')
Split the complex type data column in a row (eg., a\001b\001c --> Array = {a,b,c}).
OPTIONS('COMPLEX_DELIMITER_LEVEL_1'='\001')
Split the complex type nested data column in a row. Applies level_1 delimiter & applies level_2 based on complex data type (eg., a\002b\001c\002d --> Array> = {{a,b},{c,d}}).
OPTIONS('COMPLEX_DELIMITER_LEVEL_2'='\002')
Split the complex type nested data column in a row. Applies level_1 delimiter, applies level_2 and then level_3 delimiter based on complex data type. Used in case of nested Complex Map type. (eg., 'a\003b\002b\003c\001aa\003bb\002cc\003dd' --> Array Of Map> = {{a -> b, b -> c},{aa -> bb, cc -> dd}}).
OPTIONS('COMPLEX_DELIMITER_LEVEL_3'='\003')
Date and Timestamp format for specified column.
OPTIONS('DATEFORMAT' = 'yyyy-MM-dd','TIMESTAMPFORMAT'='yyyy-MM-dd HH:mm:ss')
NOTE: Date formats are specified by date pattern strings. The date pattern in CarbonData is the same as in JAVA. Refer to SimpleDateFormat.
Range bounds for sort columns.
Suppose the table is created with 'SORT_COLUMNS'='name,id' and the range for name is aaa to zzz, the value range for id is 0 to 1000. Then during data loading, we can specify the following option to enhance data loading performance.
OPTIONS('SORT_COLUMN_BOUNDS'='f,250;l,500;r,750')
Each bound is separated by ';' and each field value in bound is separated by ','. In the example above, we provide 3 bounds to distribute records to 4 partitions. The values 'f','l','r' can evenly distribute the records. Inside carbondata, for a record we compare the value of sort columns with that of the bounds and decide which partition the record will be forwarded to.
NOTE:
Methods of handling bad records are as follows:
OPTIONS('BAD_RECORDS_LOGGER_ENABLE'='true', 'BAD_RECORD_PATH'='hdfs://hacluster/tmp/carbon', 'BAD_RECORDS_ACTION'='REDIRECT', 'IS_EMPTY_DATA_BAD_RECORD'='false')
NOTE:
Example:
LOAD DATA INPATH 'filepath.csv' INTO TABLE tablename
OPTIONS('BAD_RECORDS_LOGGER_ENABLE'='true','BAD_RECORD_PATH'='hdfs://hacluster/tmp/carbon',
'BAD_RECORDS_ACTION'='REDIRECT','IS_EMPTY_DATA_BAD_RECORD'='false')
If the SORT_SCOPE is defined as GLOBAL_SORT, then the user can specify the number of partitions to use while shuffling data for sort using GLOBAL_SORT_PARTITIONS. If it is not configured, or configured less than 1, then it uses the number of map tasks as reduce tasks. It is recommended that each reduce task deals with 512MB-1GB data. For RANGE_COLUMN, GLOBAL_SORT_PARTITIONS is used to specify the number of range partitions also. GLOBAL_SORT_PARTITIONS should be specified optimally during RANGE_COLUMN LOAD because if a higher number is configured then the load time may be less but it will result in the creation of more files which would degrade the query and compaction performance. Conversely, if fewer partitions are configured then the load performance may degrade due to less use of parallelism but the query and compaction will become faster. Hence the user may choose an optimal number depending on the use case.
OPTIONS('GLOBAL_SORT_PARTITIONS'='2')
NOTE:
For RANGE_COLUMN, SCALE_FACTOR is used to control the number of range partitions as following.
splitSize = max(blocklet_size, (block_size - blocklet_size)) * scale_factor
numPartitions = total size of input data / splitSize
The default value is 3, and the range is [1, 300].
OPTIONS('SCALE_FACTOR'='10')
NOTE:
This command inserts data into a CarbonData table, it is defined as a combination of two queries Insert and Select query respectively. It inserts records from a source table into a target CarbonData table, the source table can be a Hive table, Parquet table or a CarbonData table itself. It comes with the functionality to aggregate the records of a table by performing Select query on source table and load its corresponding resultant records into a CarbonData table.
INSERT INTO TABLE <CARBONDATA TABLE> SELECT * FROM sourceTableName
[ WHERE { <filter_condition> } ]
User can also omit the table
keyword and write the query as:
INSERT INTO <CARBONDATA TABLE> SELECT * FROM sourceTableName
[ WHERE { <filter_condition> } ]
Overwrite insert data:
INSERT OVERWRITE TABLE <CARBONDATA TABLE> SELECT * FROM sourceTableName
[ WHERE { <filter_condition> } ]
NOTE:
Examples
INSERT INTO table1 SELECT item1, sum(item2 + 1000) as result FROM table2 group by item1
INSERT INTO table1 SELECT item1, item2, item3 FROM table2 where item2='xyz'
INSERT OVERWRITE TABLE table1 SELECT * FROM TABLE2
Stage input files are data files written by external application (such as Flink). These files are committed but not loaded into the table.
User can use this command to insert them into the table, thus making them visible for a query.
INSERT INTO <CARBONDATA TABLE> STAGE OPTIONS(property_name=property_value, ...)
Supported Properties:
Property | Description |
---|---|
BATCH_FILE_COUNT | The number of stage files per processing |
BATCH_FILE_ORDER | The order type of stage files in per processing |
User can use the following options to load data:
The number of stage files per processing.
OPTIONS('batch_file_count'='5')
The order type of stage files in per processing, choices: ASC, DESC. The default is ASC. Stage files will order by the last modified time with the specified order type.
OPTIONS('batch_file_order'='DESC')
Examples:
INSERT INTO table1 STAGE
INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5')
Note: This command uses the default file order, will insert the earliest stage files into the table.
INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5', 'batch_file_order'='DESC')
Note: This command will insert the latest stage files into the table.
This command allows you to load data using static partition.
LOAD DATA INPATH 'folder_path'
INTO TABLE [db_name.]table_name PARTITION (partition_spec)
OPTIONS(property_name=property_value, ...)
INSERT INTO TABLE [db_name.]table_name PARTITION (partition_spec) <SELECT STATEMENT>
Example:
LOAD DATA INPATH '${env:HOME}/staticinput.csv'
INTO TABLE locationTable
PARTITION (country = 'US', state = 'CA')
INSERT INTO TABLE locationTable
PARTITION (country = 'US', state = 'AL')
SELECT <columns list excluding partition columns> FROM another_user
This command allows you to load data using dynamic partition. If partition spec is not specified, then the partition is considered as dynamic.
Example:
LOAD DATA INPATH '${env:HOME}/staticinput.csv'
INTO TABLE locationTable
INSERT INTO TABLE locationTable
SELECT <columns list excluding partition columns> FROM another_user
This command will allow to update the CarbonData table based on the column expression and optional filter conditions.
UPDATE <table_name>
SET (column_name1, column_name2, ... column_name n) = (column1_expression , column2_expression, ... column n_expression )
[ WHERE { <filter_condition> } ]
alternatively the following command can also be used for updating the CarbonData Table :
UPDATE <table_name>
SET (column_name1, column_name2) =(select sourceColumn1, sourceColumn2 from sourceTable [ WHERE { <filter_condition> } ] )
[ WHERE { <filter_condition> } ]
NOTE: The update command fails if multiple input rows in source table are matched with single row in destination table.
Examples:
UPDATE t3 SET (t3_salary) = (t3_salary + 9) WHERE t3_name = 'aaa1'
UPDATE t3 SET (t3_date, t3_country) = ('2017-11-18', 'india') WHERE t3_salary < 15003
UPDATE t3 SET (t3_country, t3_name) = (SELECT t5_country, t5_name FROM t5 WHERE t5_id = 5) WHERE t3_id < 5
UPDATE t3 SET (t3_date, t3_serialname, t3_salary) = (SELECT '2099-09-09', t5_serialname, '9999' FROM t5 WHERE t5_id = 5) WHERE t3_id < 5
UPDATE t3 SET (t3_country, t3_salary) = (SELECT t5_country, t5_salary FROM t5 FULL JOIN t3 u WHERE u.t3_id = t5_id and t5_id=6) WHERE t3_id >6
NOTE: Update Complex datatype columns is not supported.
This command allows us to delete records from CarbonData table.
DELETE FROM table_name [WHERE expression]
Examples:
DELETE FROM carbontable WHERE column1 = 'china'
DELETE FROM carbontable WHERE column1 IN ('china', 'USA')
DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2)
DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2 WHERE column1 = 'USA')
This command allows us to delete the data files (stage data) which is already loaded into the table.
DELETE FROM TABLE [db_name.]table_name STAGE OPTIONS(property_name=property_value, ...)
Supported Properties:
Property | Description |
---|---|
retain_hour | Data file retain time in hours |
You can use the following options to delete data:
Data file retain time in second, the command just delete overdue files only.
OPTIONS('retain_hour'='1')
Examples:
DELETE FROM TABLE carbontable STAGE
DELETE FROM TABLE carbontable STAGE OPTIONS ('retain_hour'='1')
Compaction improves the query performance significantly.
There are several types of compaction.
ALTER TABLE [db_name.]table_name COMPACT 'MINOR/MAJOR/CUSTOM'
In Minor compaction, the user can specify the number of loads to be merged. Minor compaction triggers for every data load if the parameter carbon.enable.auto.load.merge is set to true. If any segments are available to be merged, then compaction will run parallel with data load, there are 2 levels in minor compaction:
ALTER TABLE table_name COMPACT 'MINOR'
In Major compaction, multiple segments can be merged into one large segment. User will specify the compaction size until which segments can be merged, Major compaction is usually done during the off-peak time. Configure the property carbon.major.compaction.size with appropriate value in MB.
This command merges the specified number of segments into one segment:
ALTER TABLE table_name COMPACT 'MAJOR'
In Custom compaction, user can directly specify segment ids to be merged into one large segment. All specified segment ids should exist and be valid, otherwise compaction will fail. Custom compaction is usually done during the off-peak time.
ALTER TABLE table_name COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (2,3,4)
Clean the segments which are compacted:
CLEAN FILES FOR TABLE carbon_table