数据格式
{ "年份" : "1990" , "国家补贴(亿元)" : "5.4" , "地方补贴(亿元)" : "3.2" , "企业补贴(亿元)" : "0.8" , "其他补贴(亿元)" : "0.5" }
{ "年份" : "1991" , "国家补贴(亿元)" : "5.8" , "地方补贴(亿元)" : "3.4" , "企业补贴(亿元)" : "0.9" , "其他补贴(亿元)" : "0.6" }
{ "年份" : "1992" , "国家补贴(亿元)" : "6.2" , "地方补贴(亿元)" : "3.7" , "企业补贴(亿元)" : "1" , "其他补贴(亿元)" : "0.7" }
{ "年份" : "1993" , "国家补贴(亿元)" : "7" , "地方补贴(亿元)" : "4.1" , "企业补贴(亿元)" : "1.2" , "其他补贴(亿元)" : "0.8" }
{ "年份" : "1994" , "国家补贴(亿元)" : "7.8" , "地方补贴(亿元)" : "4.5" , "企业补贴(亿元)" : "1.4" , "其他补贴(亿元)" : "0.9" }
{ "年份" : "1995" , "国家补贴(亿元)" : "8.5" , "地方补贴(亿元)" : "4.9" , "企业补贴(亿元)" : "1.6" , "其他补贴(亿元)" : "1" }
{ "年份" : "1996" , "国家补贴(亿元)" : "9.2" , "地方补贴(亿元)" : "5.3" , "企业补贴(亿元)" : "1.8" , "其他补贴(亿元)" : "1.1" }
{ "年份" : "1997" , "国家补贴(亿元)" : "10" , "地方补贴(亿元)" : "5.7" , "企业补贴(亿元)" : "2" , "其他补贴(亿元)" : "1.2" }
{ "年份" : "1998" , "国家补贴(亿元)" : "10.8" , "地方补贴(亿元)" : "6.1" , "企业补贴(亿元)" : "2.2" , "其他补贴(亿元)" : "1.3" }
{ "年份" : "1999" , "国家补贴(亿元)" : "11.6" , "地方补贴(亿元)" : "6.6" , "企业补贴(亿元)" : "2.5" , "其他补贴(亿元)" : "1.4" }
{ "年份" : "2000" , "国家补贴(亿元)" : "12.5" , "地方补贴(亿元)" : "7.2" , "企业补贴(亿元)" : "2.8" , "其他补贴(亿元)" : "1.6" }
{ "年份" : "2001" , "国家补贴(亿元)" : "13.5" , "地方补贴(亿元)" : "7.9" , "企业补贴(亿元)" : "3.2" , "其他补贴(亿元)" : "1.8" }
{ "年份" : "2002" , "国家补贴(亿元)" : "14.5" , "地方补贴(亿元)" : "8.7" , "企业补贴(亿元)" : "3.7" , "其他补贴(亿元)" : "2" }
{ "年份" : "2003" , "国家补贴(亿元)" : "15.6" , "地方补贴(亿元)" : "9.6" , "企业补贴(亿元)" : "4.3" , "其他补贴(亿元)" : "2.2" }
{ "年份" : "2004" , "国家补贴(亿元)" : "16.8" , "地方补贴(亿元)" : "10.6" , "企业补贴(亿元)" : "5" , "其他补贴(亿元)" : "2.5" }
{ "年份" : "2005" , "国家补贴(亿元)" : "18.2" , "地方补贴(亿元)" : "11.7" , "企业补贴(亿元)" : "5.8" , "其他补贴(亿元)" : "2.8" }
{ "年份" : "2006" , "国家补贴(亿元)" : "19.8" , "地方补贴(亿元)" : "12.9" , "企业补贴(亿元)" : "6.7" , "其他补贴(亿元)" : "3.2" }
{ "年份" : "2007" , "国家补贴(亿元)" : "21.5" , "地方补贴(亿元)" : "14.3" , "企业补贴(亿元)" : "7.7" , "其他补贴(亿元)" : "3.7" }
{ "年份" : "2008" , "国家补贴(亿元)" : "23.3" , "地方补贴(亿元)" : "15.9" , "企业补贴(亿元)" : "8.8" , "其他补贴(亿元)" : "4.3" }
{ "年份" : "2009" , "国家补贴(亿元)" : "25.2" , "地方补贴(亿元)" : "17.6" , "企业补贴(亿元)" : "10.1" , "其他补贴(亿元)" : "5" }
{ "年份" : "2010" , "国家补贴(亿元)" : "27.2" , "地方补贴(亿元)" : "19.4" , "企业补贴(亿元)" : "11.6" , "其他补贴(亿元)" : "5.8" }
{ "年份" : "2011" , "国家补贴(亿元)" : "29.2" , "地方补贴(亿元)" : "21.3" , "企业补贴(亿元)" : "13.3" , "其他补贴(亿元)" : "6.7" }
{ "年份" : "2012" , "国家补贴(亿元)" : "31.3" , "地方补贴(亿元)" : "23.4" , "企业补贴(亿元)" : "15.2" , "其他补贴(亿元)" : "7.7" }
{ "年份" : "2013" , "国家补贴(亿元)" : "33.5" , "地方补贴(亿元)" : "25.6" , "企业补贴(亿元)" : "17.3" , "其他补贴(亿元)" : "8.8" }
{ "年份" : "2014" , "国家补贴(亿元)" : "35.8" , "地方补贴(亿元)" : "27.9" , "企业补贴(亿元)" : "19.6" , "其他补贴(亿元)" : "10" }
{ "年份" : "2015" , "国家补贴(亿元)" : "38.2" , "地方补贴(亿元)" : "30.3" , "企业补贴(亿元)" : "22.1" , "其他补贴(亿元)" : "11.4" }
{ "年份" : "2016" , "国家补贴(亿元)" : "40.7" , "地方补贴(亿元)" : "32.8" , "企业补贴(亿元)" : "24.9" , "其他补贴(亿元)" : "13.1" }
{ "年份" : "2017" , "国家补贴(亿元)" : "43.3" , "地方补贴(亿元)" : "35.5" , "企业补贴(亿元)" : "27.9" , "其他补贴(亿元)" : "15.2" }
{ "年份" : "2018" , "国家补贴(亿元)" : "46.2" , "地方补贴(亿元)" : "38.3" , "企业补贴(亿元)" : "31.2" , "其他补贴(亿元)" : "17.6" }
{ "年份" : "2019" , "国家补贴(亿元)" : "49.3" , "地方补贴(亿元)" : "41.3" , "企业补贴(亿元)" : "34.8" , "其他补贴(亿元)" : "20.3" }
{ "年份" : "2020" , "国家补贴(亿元)" : "52.5" , "地方补贴(亿元)" : "44.6" , "企业补贴(亿元)" : "38.7" , "其他补贴(亿元)" : "23.5" }
{ "年份" : "2021" , "国家补贴(亿元)" : "55.9" , "地方补贴(亿元)" : "48.2" , "企业补贴(亿元)" : "42.8" , "其他补贴(亿元)" : "27.1" }
{ "年份" : "2022" , "国家补贴(亿元)" : "59.4" , "地方补贴(亿元)" : "52.1" , "企业补贴(亿元)" : "47.3" , "其他补贴(亿元)" : "31.4" }
{ "年份" : "2023" , "国家补贴(亿元)" : "63.1" , "地方补贴(亿元)" : "56.5" , "企业补贴(亿元)" : "52.4" , "其他补贴(亿元)" : "36.2" }
javabean
package cn. lhz. bean ; import cn. lhz. util. annotation. RowKeyAnnotation ;
import lombok. AllArgsConstructor ;
import lombok. Getter ;
import lombok. NoArgsConstructor ;
import lombok. Setter ;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class SubsidyYear { @RowKeyAnnotation private Integer year; private double country; private double local; private double enterprise; private double other; @Override public String toString ( ) { return this . year + "\t" + this . country + "," + this . local + "," + this . enterprise + "," + this . other; }
}
package cn. lhz. etl ; import cn. lhz. bean. SubsidyYear ;
import cn. lhz. util. hbase . HbaseUtil ;
import cn. lhz. util. string. StringUtil ;
import org. apache. hadoop . conf. Configuration ;
import org. apache. hadoop . fs. FileSystem ;
import org. apache. hadoop . fs. Path ;
import org. apache. hadoop . hbase . client. Connection ;
import org. apache. hadoop . hbase . client. Table ;
import org. apache. hadoop . io. LongWritable ;
import org. apache. hadoop . io. Text ;
import org. apache. hadoop . mapreduce . Job ;
import org. apache. hadoop . mapreduce . Mapper ;
import org. apache. hadoop . mapreduce . Reducer ;
import org. apache. hadoop . mapreduce . lib. input. FileInputFormat ;
import org. apache. hadoop . mapreduce . lib. output. FileOutputFormat ; import java. io. IOException ;
import java. lang. reflect. InvocationTargetException ;
public class SubsidyYear2Hbase { public static class SubsidyYearMapper extends Mapper < LongWritable , Text , Text , Text > { @Override protected void map ( LongWritable key, Text value, Mapper < LongWritable , Text , Text , Text > . Context context) throws IOException , InterruptedException { String json = value. toString ( ) ; String csv = StringUtil . extractValuesToString ( json) ; System . out. println ( csv) ; System . out. println ( "key >>> " + csv. substring ( 0 , csv. indexOf ( "," ) ) ) ; System . out. println ( "value >>> " + csv. substring ( csv. indexOf ( "," ) + 1 ) ) ; Text outKey = new Text ( csv. substring ( 0 , csv. indexOf ( "," ) ) ) ; Text outValue = new Text ( csv. substring ( csv. indexOf ( "," ) + 1 ) ) ; context. write ( outKey, outValue) ; } } public static class SubsidyYearReducer extends Reducer < Text , Text , Text , Text > { private Connection connection; public Table table; @Override protected void setup ( Reducer < Text , Text , Text , Text > . Context context) throws IOException , InterruptedException { connection = HbaseUtil . getConnection ( ) ; String tableName = "SUBSIDY_YEAR" ; table = HbaseUtil . getTable ( connection, tableName) ; } @Override protected void reduce ( Text key, Iterable < Text > values, Reducer < Text , Text , Text , Text > . Context context) throws IOException , InterruptedException { String csv = "" ; for ( Text value : values) { csv = value. toString ( ) ; } try { SubsidyYear subsidyYear = StringUtil . csv2Bean ( csv, false , SubsidyYear . class ) ; subsidyYear. setYear ( Integer . parseInt ( key. toString ( ) ) ) ; HbaseUtil . upsert ( table, "OVER_THE_YEARS" , subsidyYear) ; } catch ( IllegalAccessException | NoSuchMethodException | InvocationTargetException | InstantiationException e) { throw new RuntimeException ( e) ; } } @Override protected void cleanup ( Reducer < Text , Text , Text , Text > . Context context) throws IOException , InterruptedException { if ( table != null ) { table. close ( ) ; } if ( connection != null ) { connection. close ( ) ; } } } public static void main ( String [ ] args) throws IOException , InterruptedException , ClassNotFoundException { System . setProperty ( "HADOOP_USER_NAME" , "root" ) ; Configuration conf = new Configuration ( ) ; conf. set ( "mapreduce .app-submission.cross-platform" , "true" ) ; conf. set ( "mapreduce .framework.name" , "local" ) ; conf. set ( "mapreduce .cluster.local.dir" , "file:///home/lhz/hadoop " ) ; Job job = Job . getInstance ( conf, "教育历年补贴" ) ; job. setJarByClass ( SubsidyYear2Hbase . class ) ; job. setMapperClass ( SubsidyYearMapper . class ) ; job. setReducerClass ( SubsidyYearReducer . class ) ; job. setOutputKeyClass ( Text . class ) ; job. setOutputValueClass ( Text . class ) ; FileInputFormat . addInputPath ( job, new Path ( "/edu-ods/教育补贴.log" ) ) ; Path path = new Path ( "/edu-dwd" ) ; FileSystem fs = path. getFileSystem ( conf) ; if ( fs. exists ( path) ) { fs. delete ( path, true ) ; } FileOutputFormat . setOutputPath ( job, path) ; System . exit ( job. waitForCompletion ( true ) ? 0 : 1 ) ; }
}