spark 实现K-means算法
package kmeans;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;.
import java.util.Arrays;
import java.util.Iterator;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class kmeans{static double[][] center = new double[4][2]; static int[] number = new int[4]; static double[][] new_center = new double[4][2]; public static void main(String[] args) {ArrayList<String> arrayList = new ArrayList<String>();try {File file = new File("/usr/local/hadoop-2.7.3/centers.txt");InputStreamReader input = new InputStreamReader(new FileInputStream(file));BufferedReader bf = new BufferedReader(input);String str;while ((str = bf.readLine()) != null) {arrayList.add(str);}bf.close();input.close();} catch (IOException e) {e.printStackTrace();}for (int i = 0; i < 4; i++) {for (int j = 0; j < 2; j++) {String s = arrayList.get(i).split(",")[j];center[i][j] = Double.parseDouble(s);}}SparkConf conf = new SparkConf().setAppName("kmeans").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> datas = jsc.textFile("spark/input4/k-means.dat"); while(true) {for (int i = 0; i< 4;i++) {number[i]=0;}JavaPairRDD<Integer, Tuple2<Double, Double>> data = datas.mapToPair(new PairFunction<String, Integer,Tuple2<Double, Double>>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer,Tuple2<Double, Double>> call(String str) throws Exception {final double[][] loc = center; String[] datasplit = str.split(","); double x = Double.parseDouble(datasplit[0]);double y = Double.parseDouble(datasplit[1]);double minDistance = 99999999;int centerIndex = 0; for(int i = 0;i < 4;i++){double itsDistance = (x-loc[i][0])*(x-loc[i][0])+(y-loc[i][1])*(y-loc[i][1]);if(itsDistance < minDistance){minDistance = itsDistance;centerIndex = i; }}number[centerIndex]++; return new Tuple2<Integer,Tuple2<Double, Double>>(centerIndex, new Tuple2<Double,Double>(x,y));}});JavaPairRDD<Integer, Iterable<Tuple2<Double, Double>>> sum_center = data.groupByKey();JavaPairRDD<Integer,Tuple2<Double, Double>> Ncenter = sum_center.mapToPair(new PairFunction<Tuple2<Integer, Iterable<Tuple2<Double, Double>>>,Integer,Tuple2<Double, Double>>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, Tuple2<Double, Double>> call(Tuple2<Integer, Iterable<Tuple2<Double, Double>>> a)throws Exception {int sum_x = 0;int sum_y = 0;Iterable<Tuple2<Double, Double>> it = a._2;for(Tuple2<Double, Double> i : it) {sum_x += i._1;sum_y +=i._2;}double average_x = sum_x / number[a._1];double average_y = sum_y/number[a._1];return new Tuple2<Integer,Tuple2<Double,Double>>(a._1,new Tuple2<Double,Double>(average_x,average_y));} }); Ncenter.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Double,Double>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<Integer,Tuple2<Double,Double>> t) throws Exception {new_center[t._1][0] = t._2()._1;new_center[t._1][1] = t._2()._2;System.out.println("the new center: "+ t._1+" "+t._2()._1+" , "+t._2()._2);}});double distance = 0;for(int i=0;i<4;i++) { distance += (center[i][0]-new_center[i][0])*(center[i][0]-new_center[i][0]) + (center[i][1]-new_center[i][1])*(center[i][1]-new_center[i][1]); }if(distance == 0.0) {for(int j = 0;j<4;j++) {System.out.println("the final center: "+" "+center[j][0]+" , "+center[j][1]);}break;}else {for(int i = 0;i<4;i++) {center[i][0] = new_center[i][0];center[i][1] = new_center[i][1];new_center[i][0] = 0;new_center[i][1] = 0;System.out.println("the new center: "+" "+center[i][0]+" , "+center[i][1]);}}}}}
输入:
1. centers.txt :96,826606,776 474,866400,768
- data.dat:
存放所有点的坐标存放所有点的坐标。
96,826
606,776
474,866
400,768
2,920
356,766
36,687
-26,824
422,744
145,124
-21,207
154,781
157,872
-66,97
112,126
538,181
2,758
361,152
103,850
5,68
218,249
206,318
604,877
32,327
531,833
559,-17
237,255
485,196
166,784
86,913
-23,220
184,313
175,161
193,999
520,878
505,162
398,108
330,830
528,700
546,877
388,709
102,891
439,863
525,171
470,947
461,757
538,84
43,672
-37,442
534,70
-59,221
196,235
469,317
176,908
62,190
591,211
78,224
623,57
312,167
164,205
451,783
461,-14
309,915
487,848
580,275
16,833
605,991
495,292
219,790
167,889
205,195
224,255
-11,233
99,209
468,932
370,96
509,900
142,83
577,-3
-63,259
584,916
136,789
227,845
100,256
505,688
454,149
486,160
426,292
18,876
534,722
-37,92
574,191
-8,794
51,845
498,107
105,907
55,47
540,668
494,329
448,201
113,191
-8,801
191,582
573,634
117,214
48,235
311,-55
463,684
38,852
74,638
698,68
591,745
553,843
32,201
199,824
491,789
547,308
135,859
128,146
391,791
15,733
449,259
450,280
142,89
128,960
500,204
469,738
500,226
100,902
527,807
535,648
151,394
87,863
373,671
413,223
499,255
443,918
435,779
77,276
121,751
495,197
485,299
428,754
509,-40
361,893
90,192
490,688
44,753
545,867
501,908
528,291
101,937
118,110
52,790
144,711
355,814
580,305
16,788
596,316
57,186
508,713
109,835
518,216
604,757
608,276
727,859
-52,232
497,266
659,748
555,965
113,293
583,611
536,732
568,663
228,807
452,204
83,159
97,296
560,174
461,722
574,787
376,877
14,687
221,715
69,979
580,137
709,637
133,689
72,849
415,214
177,870
513,173
160,179
118,769
352,706
-17,607
490,96
87,425
282,756
592,177
182,-3
-79,338
463,822
87,910
131,791
431,279
687,677
230,634
627,809
127,227
422,302
297,344
101,672
414,342
0,262
464,1027
650,909
591,797
-2,154
208,753
315,734
136,856
402,224
508,76
207,126
528,378
375,226
433,728
67,832
469,120
49,805
382,734
582,104
7,271
556,1008
145,96
623,767
466,272
139,910
446,255
436,-35
463,818
162,266
162,266
73,923
125,838
34,248
368,902
-32,672
473,22
73,189
388,316
133,883
99,228
319,320
474,876
576,252
43,733
89,197
-25,162
173,812
523,841
61,827
457,720
180,238
51,703
120,374
104,788
161,227
469,408
-29,721
105,773
556,217
492,849
411,9
654,387
166,791
470,181
49,124
476,864
148,263
593,193
152,259
130,879
-15,185
639,220
559,249
349,871
451,617
419,220
51,167
-13,193
598,189
452,252
115,1019
134,909
-62,794
109,298
19,749
92,193
469,236
254,156
221,720
528,98
708,683
653,377
47,57
482,806
349,814
547,744
94,765
495,132
101,385
22,180
-44,63
509,121
157,221
560,171
149,124
433,954
138,119
483,912
160,184
436,275
46,663
504,224
69,289
151,228
72,193
574,685
157,862
379,814
452,330
538,180
129,747
60,855
168,265
95,849
155,711
240,203
98,176
175,558
480,771
18,636
79,829
50,148
36,707
568,222
513,260
199,49
564,460
117,869
469,308
381,661
10,193
539,346
83,773
341,288
-19,887
756,178
160,887
20,116
15,777
307,748
56,870
125,240
468,1008
424,841
517,872
198,752
605,721
125,180
65,827
540,657
63,771
314,708
376,289
459,254
89,853
180,285
404,68
451,830
-94,206
545,804
501,856
102,230
574,834
480,855
668,394
548,600
-87,266
149,885
345,237
599,149
239,148
608,963
364,202
69,799
553,232
471,818
154,180