Computing Scientometrics in Large-Scale Academic Search Engines with MapReduce

In this paper we introduced four algorithms for computing numerous scientometrics in parallel by using the Hadoop/MapReduce framework. Here we provide the Java implementations of the Map and Reduce functions of all four methods presented in the paper. This code was developed by using Hadoop 0.20.2 and it was debugged on a CoreI7 machine with 12 GB of RAM running Ubuntu Linux 10.04 LTS, 64-bit version.

For the experimental requirements of this work we used the CiteSeerX dataset, an open repository comprised of approximately 1.8 million research articles. The dataset comes in three versions: un-structured, semi-structured and fully structured. The former includes the raw text of the 1.8 million articles, whereas the other two include the articles' metadata expressed in XML and SQL formats respectively. For this specific application that we examine, we consider the XML version as the most appropriate. For more information, you may read the paper or contact me. Here is a sample XML document drawn from the CiteSeerX XML collection.

Our purpose is to process the input XML files and produce a sequence of [author_name, metric_value] pairs. In the following examples we mainly focus on the calculation of h-index, consequently, we set score=1.0. For the other two metrics, trend h-index and contemporary h-index, please consult Table 2 in the paper. Notice that the individual processing of 1.8 million XML files is very inefficient, since the underlying distributed file system (HDFS) is designed for optimal performance when dealing with considerably larger files (i.e. 64 MB). For this reason, we should pack many (thousands) of these XML files into line-sequential files of larger sizes. For this particular dataset, we packed the 1.8 million small-sized XML files into 432 binary files occupying 64 MB each.

You can download the full code, the accompanying classes, and a small input dataset by clicking here.
Note: The researchers who used/will use this code are kindly requested to cite the aforementioned article in their work/s.
Please also check the relevant GitHub repository.

Method 1

In this method the Mapper class parses the input documents and extracts its references. For each of these references, we identify all of its authors and for each author we transmit one [author_name, (publication_id, score)] tuple to the Reducer.

Method 1: Mapper class (Algorithm 1 of the paper, following the step 8a)
public static class Map1 extends Mapper<LongWritable, Text, Text, MapOutputValue> {
   private Text word = new Text();
   private int s = 0, e = 0, s1 = 0, e1 = 0;

   public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();
      String refXML = new String();
      String refID = new String();
      String ReferenceAuthors = new String();
      int refIDint;

      s = 0; e = 0;
      while (s != -1) {
         s = line.indexOf("<citation id=\"", s);
         if (s > 0) {
            s += 14;
            e = line.indexOf("</citation>", s);
            refXML = line.substring(s, e);

            s1 = 0; e1 = 0;
            s1 = refXML.indexOf("<clusterid>");

            if (s1 > 0) {
               s1 += 11;
               e1 = refXML.indexOf("</clusterid>", s1);
               refID = refXML.substring(s1, e1);
               refIDint = Integer.parseInt(refID);

               s1 = 0; e1 = 0;
               s1 = refXML.indexOf("<authors>");
               if (s1 > 0) {
                  s1 += 9;
                  e1 = refXML.indexOf("</authors>", s1);
                  ReferenceAuthors = refXML.substring(s1, e1);

                  String ReferenceAuthorsArray[] = ReferenceAuthors.split(",");
                  for (String a : ReferenceAuthorsArray) {
                     if (a.length() > 0 && refIDint > 0) {
                        MapOutputValue citationScore = new MapOutputValue(refIDint, 1.0f);
                        word.set(a);
                        context.write(word, citationScore);
                     }
                  }
               }
            }
         }
      }
   }
}

Now the Reducer class receives the [author_name, (publication_id, score)] tuples and generates the required [author_name, metric_val] pairs. Notice that the input tuples arrive at the Reducer in ascending author_name order.

Method 1: Reducer class (Algorithm 2 of the paper)
public static class Reduce1 extends Reducer<Text, MapOutputValue, Text, IntWritable> {
   public void reduce(Text key, Iterable<MapOutputValue> values, Context context) throws IOException, InterruptedException {
      Hashtable<Integer, Float> citations = new Hashtable<Integer, Float>();
      int metric = 0, entry = 0;
      float score = 0.0f;

      for (MapOutputValue val : values) {
         if (citations.get(val.get_paperID()) == null) {
            citations.put(val.get_paperID(), val.get_paperScore());
         } else {
            citations.put(val.get_paperID(), val.get_paperScore() + citations.get(val.get_paperID()).floatValue());
         }
      }

      Vector<Float> v = new Vector<Float>(citations.values());
      Collections.sort(v, Collections.reverseOrder());
      Iterator<Float> float_it = v.iterator();

      while (float_it.hasNext()) {
         entry++;
         score = (Float)float_it.next();
         if (score >= entry) {
            metric++;
         } else {
            break;
         }
      }
      context.write(key, new IntWritable(metric));
   }
}

Notice that the value field emmited by the Mapper is a (publication_id, score) pair. Such (int, float) pairs are not built-in within the Hadoop framework. For this reason, we implement the custom datatype MapOutputValue in a separate class to determine how the system will read and write such datatypes. The implementation of the MapOutputValue is provided in the Appendix A.

Method 2

In this method, for each author of each paper reference the Mapper emits one [(author_name, publication_id), score)] tuple to the Reducer.

Method 2: Mapper class (Algorithm 1 of the paper, following the step 8b)
public static class Map1 extends Mapper<LongWritable, Text, Text, MapOutputValue> {
   private Text word = new Text();
   private int s = 0, e = 0, s1 = 0, e1 = 0;

   public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();
      String refXML = new String();
      String refID = new String();
      String ReferenceAuthors = new String();
      int refIDint;

      s = 0; e = 0;
      while (s != -1) {
         s = line.indexOf("<citation id=\"", s);
         if (s > 0) {
            s += 14;
            e = line.indexOf("</citation>", s);
            refXML = line.substring(s, e);

            s1 = 0; e1 = 0;
            s1 = refXML.indexOf("<clusterid>");

            if (s1 > 0) {
               s1 += 11;
               e1 = refXML.indexOf("</clusterid>", s1);
               refID = refXML.substring(s1, e1);
               refIDint = Integer.parseInt(refID);

               s1 = 0; e1 = 0;
               s1 = refXML.indexOf("<authors>");
               if (s1 > 0) {
                  s1 += 9;
                  e1 = refXML.indexOf("</authors>", s1);
                  ReferenceAuthors = refXML.substring(s1, e1);

                  String ReferenceAuthorsArray[] = ReferenceAuthors.split(",");
                  for (String a : ReferenceAuthorsArray) {
                     if (a.length() > 0 && refIDint > 0) {
                        MapOutputKey custom_key = new MapOutputKey(a, refIDint);
                        context.write(custom_key, score);
                     }
                  }
               }
            }
         }
      }
   }
}

The Mapper class of Method 2 implements a secondary sort, that is, the data not only are brought to the Reducer in ascending author_name order (as holds for Method 1), but also, in ascending paper order. The implementation of the Reducer class for Method 2 follows:

Method 2: Reducer class (Algorithm 3 of the paper)
public static class Reduce2 extends Reducer<MapOutputKey, FloatWritable, Text, IntWritable> {
   int reduce_start = 1, npapers = 0, previous_paperID = 0;
   float score = 0.0f;
   Text previous_author = new Text();
   Vector<Float> v = new Vector<Float>();

   public void reduce(MapOutputKey key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
      int entry = 0; int metric = 0;
      for (FloatWritable val : values) {
         if (key.get_author().equals(previous_author)) {
            if (key.get_paperID() == previous_paperID) {
               v.set(npapers, v.get(npapers) + val.get());
            } else {
               v.add(val.get());
               npapers++;
               previous_paperID = key.get_paperID();
            }
         } else {
            if (reduce_start == 0) {
               Collections.sort(v, Collections.reverseOrder());
               Iterator<Float> float_it = v.iterator();
               entry = 0; metric = 0;

               while (float_it.hasNext()) {
                  entry++;
                  score = (Float)float_it.next();
                  if (score >= entry) {
                     metric++;
                  } else {
                     break;
                  }
               }
               context.write(previous_author, new IntWritable(metric));
            }

            v.clear();
            previous_author.set(key.get_author());
            previous_paperID = key.get_paperID();
            v.add(val.get());
            npapers = 0;
            reduce_start = 0;
         }
      }
   }
}

Here the custom datatype is used in the key field of the Mapper output. Therefore, we have to implement the custom datatype MapOutputKey in a separate class to determine how the system will read and write such datatypes. However, since the custom datatype is used in the key, we have to additionally determine how the system will sort such keys (compareTo method). The implementation of the MapOutputKey is provided in the Appendix B.

Method 1-C

Despite their difference in tuples formatting, the Mappers of both Methods 1 and 2 still emit data to the Reducers each time an author of a paper reference is extracted. Since we do not check whether the key we are currently processing has been previously sent, it is inevitable that we transmit the same key multiple times. Here we present Method 1-C (where the letter "C" signifies the presence of an in-Mapper Combiner), which extends the aforementioned Method-1. The cornerstone of Method 1-C is to avoid multiple emissions of identical author names and thus, save valuable network bandwidth.

Method 1-C: Mapper and Reducer classes (Algorithm 4 of the paper
public static class Map3 extends Mapper<LongWritable, Text, Text, MapOutputValueCombiner> {
   private Text word = new Text();
   private int s = 0, e = 0, s1 = 0, e1 = 0;
   Hashtable<String, MapOutputValueCombiner> authors_papers = new Hashtable<String, MapOutputValueCombiner>();

   public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();
      String refXML = new String();
      String refID = new String();
      String ReferenceAuthors = new String();
      int refIDint;

      s = 0; e = 0;;
      while (s != -1) {
         s = line.indexOf("<citation id=\"", s);
         if (s > 0) {
            s += 14;
            e = line.indexOf("</citation>", s);
            refXML = line.substring(s, e);

            s1 = 0; e1 = 0;
            s1 = refXML.indexOf("<clusterid>");
            if (s1 > 0) {
               s1 += 11;
               e1 = refXML.indexOf("</clusterid>", s1);
               refID = refXML.substring(s1, e1);
               refIDint = Integer.parseInt(refID);

               s1 = 0; e1 = 0;
               s1 = refXML.indexOf("<authors>");
               if (s1 > 0) {
                  s1 += 9;
                  e1 = refXML.indexOf("</authors>", s1);
                  ReferenceAuthors = refXML.substring(s1, e1);

                  String ReferenceAuthorsArray[] = ReferenceAuthors.split(",");
                  for (String a : ReferenceAuthorsArray) {
                     if (a.length() > 0 && refIDint > 0) {
                        if (authors_papers.containsKey(a)) {
                           MapOutputValueCombiner movc = authors_papers.get(a);
                           movc.insert(refIDint, 1.0f);
                           authors_papers.put(a, movc);
                        } else {
                           MapOutputValueCombiner movc = new MapOutputValueCombiner(refIDint, 1.0f);
                           authors_papers.put(a, movc);
                        }
                     }
                  }
               }
            }
         }
      }
   }

   public void cleanup(Context context) throws IOException, InterruptedException {
      Set<String> key_set = authors_papers.keySet();
      Iterator<String> itr = key_set.iterator();
      String str;
      int x = 0;

      while (itr.hasNext()) {
         str = itr.next();
         x++;
         word = new Text(str);
         context.write(word, authors_papers.get(str));
      }
   }
}

public static class Reduce3 extends Reducer<Text, MapOutputValueCombiner, Text, IntWritable> {
   public void reduce(Text key, Iterable<MapOutputValueCombiner> values, Context context) throws IOException, InterruptedException {
      Hashtable<Integer, Float> citations = new Hashtable<Integer, Float>(); 
      Vector<MapOutputValue> vec = new Vector<MapOutputValue>();
      int metric = 0, entry = 0, num_vector_values = 0;
      float score = 0.0f;

      for (MapOutputValueCombiner val : values) {
         vec = val.get_values();
         num_vector_values = vec.size();
         for (int i = 0; i < num_vector_values; i++) {
            if (citations.get(vec.elementAt(i).get_paperID()) == null) {
               citations.put(vec.elementAt(i).get_paperID(), vec.elementAt(i).get_paperScore());
            } else {
               citations.put(vec.elementAt(i).get_paperID(),
                  vec.elementAt(i).get_paperScore() + citations.get(vec.elementAt(i).get_paperID()).floatValue());
            }
         }
         vec.clear();
      }

      Vector<Float> v = new Vector<Float>(citations.values());
      Collections.sort(v, Collections.reverseOrder());
      Iterator<Float> float_it = v.iterator();

      while (float_it.hasNext()) {
         entry++;
         score = (Float)float_it.next();
         if (score >= entry) {
            metric++;
         } else {
            break;
         }
      }
      context.write(key, new IntWritable(metric));
   }
}

Method 2-C

Method 2-C implements an in-Mapper Combiner in Method 2. In this case however, the container data structure does not store a list of (paper, score) pairs for each author_name, but a single cumulative score value per each distinct (author_name paper) pair. This minimizes the benefits of using a Combiner because the (author_name paper) keys are more multitudinous than the simple author keys of Method 1-C. The reduce phase in this case is identical to that of Method 2.

Method 2-C: Mapper class (Algorithm 5 of the paper)
public static class Map4 extends Mapper {
   private int s = 0, e = 0, s1 = 0, e1 = 0;
   private FloatWritable score = new FloatWritable(1.0f);
   Hashtable authors_papers = new Hashtable();

   public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();
      String refXML = new String();
      String refID = new String();
      String ReferenceAuthors = new String();
      int refIDint;

      s = 0; e = 0;
      while (s != -1) {
         s = line.indexOf("<citation id=\"", s);
         if (s > 0) {
            s += 14;
            e = line.indexOf("</citation>", s);
            refXML = line.substring(s, e);

            s1 = 0; e1 = 0;
            s1 = refXML.indexOf("<clusterid>");
            if (s1 > 0) {
               s1 += 11;
               e1 = refXML.indexOf("</clusterid>", s1);
               refID = refXML.substring(s1, e1);
               refIDint = Integer.parseInt(refID);

               s1 = 0; e1 = 0;
               s1 = refXML.indexOf("<authors>");
               if (s1 > 0) {
                  s1 += 9;
                  e1 = refXML.indexOf("</authors>", s1);
                  ReferenceAuthors = refXML.substring(s1, e1);

                  String ReferenceAuthorsArray[] = ReferenceAuthors.split(",");
                  for (String a : ReferenceAuthorsArray) {
                     if (a.length() > 0 && refIDint > 0) {
                        MapOutputKey custom_key = new MapOutputKey(a, refIDint);
                        if (authors_papers.containsKey(custom_key)) {
                           authors_papers.put(custom_key, 1.0f + authors_papers.get(custom_key).floatValue());
                        } else {
                           authors_papers.put(custom_key, 1.0f);
                        }
                     }
                  }
               }
            }
         }
      }
   }

   public void cleanup(Context context) throws IOException, InterruptedException {
      Set<MapOutputKey> key_set = authors_papers.keySet();
      Iterator<MapOutputKey> itr = key_set.iterator();
      MapOutputKey str;
      int x = 0;
      while (itr.hasNext()) {
         str = itr.next();
         x++;
         context.write(str, new FloatWritable(authors_papers.get(str).floatValue()));
      }
   }
}

Execution Driver

The MapReduce jobs are launched by using 3 arguments: 1)the input directory (i.e. the location in HDFS where the input scientific articles are found), 2) the output directory (i.e. the location in HDFS where the framework will store the results of the task), and 3) the method to be executed (it receives values '1', '2', '1-C', '2-C'). According to each method we employ each time, we define the appropriate datatypes for the Mapper Output/Reducer Input.

MapReduce Driver
public static void main(String[] args) throws Exception {
   Configuration conf = new Configuration();
   Job job = new Job(conf, "citeseer");

   if (args[2].equals("1")) {
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(MapOutputValue.class);
      job.setMapperClass(Map1.class);
      job.setReducerClass(Reduce1.class);
   }

   if (args[2].equals("2")) {
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      job.setMapOutputKeyClass(MapOutputKey.class);
      job.setMapOutputValueClass(FloatWritable.class);
      job.setMapperClass(Map2.class);
      job.setReducerClass(Reduce2.class);
   }

   if (args[2].equals("1-C")) {
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(MapOutputValueCombiner.class);
      job.setMapperClass(Map3.class);
      job.setReducerClass(Reduce3.class);
   }

   if (args[2].equals("2-C")) {
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      job.setMapOutputKeyClass(MapOutputKey.class);
      job.setMapOutputValueClass(FloatWritable.class);
      job.setMapperClass(Map4.class);
      job.setReducerClass(Reduce4.class);
   }

   job.setInputFormatClass(TextInputFormat.class);
   job.setOutputFormatClass(TextOutputFormat.class);

   FileInputFormat.addInputPath(job, new Path(args[0]));
   FileOutputFormat.setOutputPath(job, new Path(args[1]));

   job.waitForCompletion(true);
}

MapOutputValue class

Here we provide the implementation of the MapOutputValue class used by Method 1 and Method 1-C.

MapOutputValue.java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class MapOutputValue implements Writable {
   private int paperID;
   private float paperScore;

   public MapOutputValue() { this(0, 0.0f); }

   public MapOutputValue(int id, float score) {
      this.paperID = id;
      this.paperScore = score;
  }

   public void write(DataOutput out) throws IOException {
      out.writeInt(paperID);
      out.writeFloat(paperScore);
   }

   public void readFields(DataInput in) throws IOException {
      paperID = in.readInt();
      paperScore = in.readFloat();
   }

   public int get_paperID() { return this.paperID; }
   public float get_paperScore() { return this.paperScore; }
}

MapOutputKey class

Here we provide the implementation of the MapOutputKey class used by Method 2.

MapOutputKey.java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.*;

public class MapOutputKey implements WritableComparable<MapOutputKey> {
   private Text author;
   private int paperID;

   public MapOutputKey() { set(new Text(), 0); }
   public MapOutputKey(String a, int pid) { set(new Text(a), pid); }
   public MapOutputKey(Text a, int pid) { set(a, pid); }

   public Text get_author() { return author; }
   public int get_paperID() { return paperID; }

   public void set(Text a, int pid) {
      author = a;
      paperID = pid;
   }

   @Override public void write(DataOutput out) throws IOException {
      author.write(out);
      out.writeInt(paperID);
   }

   @Override public void readFields(DataInput in) throws IOException {
      author.readFields(in);
      paperID = in.readInt();
   }

   @Override public boolean equals(Object o) {
      if (o instanceof MapOutputKey) {
         MapOutputKey mok = (MapOutputKey) o;
         return author.equals(mok.author) && (paperID == mok.paperID);
      }
      return false;
   }

   @Override public int compareTo (MapOutputKey mok) {
      int cmp = author.compareTo(mok.author);
      if (cmp != 0) { return cmp; }
      return paperID - mok.paperID;
   }
}

MapOutputValueCombiner class

Here we provide the implementation of the MapOutputValueCombiner class used by Method 1-C.

MapOutputValueCombiner.java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.Writable;

public class MapOutputValueCombiner implements Writable {
   private int num_values;
   private Vector<MapOutputValue> MapOutputValuesVector;

   public MapOutputValueCombiner() {
      this.num_values = 0;
      this.MapOutputValuesVector = new Vector<MapOutputValue>();
   }

   public MapOutputValueCombiner(int id, float score) {
      this.MapOutputValuesVector = new Vector();
      insert(id, score);
   }

   public void write(DataOutput out) throws IOException {
      out.writeInt(this.num_values);
      for (int i = 0; i < this.num_values; i++) {
         if (this.MapOutputValuesVector.elementAt(i) instanceof MapOutputValue) {
            out.writeInt(this.MapOutputValuesVector.elementAt(i).get_paperID());
            out.writeFloat(this.MapOutputValuesVector.elementAt(i).get_paperScore());
         }
      }
   }

   public void readFields(DataInput in) throws IOException {
      this.num_values = in.readInt();
      for (int i = 0; i < this.num_values; i++) {
         MapOutputValue temp = new MapOutputValue(in.readInt(), in.readFloat());
         this.MapOutputValuesVector.add(temp);
      }
   }

   public int get_num_values() { return this.num_values; }
   public Vector<MapOutputValue> get_values() { return this.MapOutputValuesVector; }

   public void insert(int id, float score) {
      for (int i = 0; i < this.num_values; i++) {
         if (this.MapOutputValuesVector.elementAt(i) instanceof MapOutputValue) {
            if (this.MapOutputValuesVector.elementAt(i).get_paperID() == id) {
               MapOutputValue temp = new MapOutputValue(id, score + this.MapOutputValuesVector.get(i).get_paperScore());
               this.MapOutputValuesVector.set(i, temp);
               return;
            }
         }
      }

      MapOutputValue temp = new MapOutputValue(id, score);
      this.MapOutputValuesVector.add(temp);
      this.num_values++;
   }
}