I'm trying to run a Map-Reduce job on Hadoop Streaming with Python scripts, and It work fines when I use jupyter terminal.
But when I run the following
./bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar -file /usr/local/hadoop/python/assignmap1.py /usr/local/hadoop/python/assignreduce1.py -mapper "python assignmap1.py" -reducer "python assignreduce1.py" -input input1/data.txt -output output
It come up this error map and reduce are both 0%
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
here is my code of map
import sys
import pandas as pd
def map():
dataset = pd.read_table(sys.stdin)
for a in range(len(dataset)):
key = dataset.loc[a][0]
date = dataset.loc[a][1]
value = dataset.loc[a][3]
#return(mapped)
print(" ".join([key,date,value]))
if __name__ == "__main__":
map()
and here are code of my reuce
import sys
import pandas as pd
def reduce():
temperature = {"City":["Date","Temp",""]}
for data in sys.stdin:
splited = data.split(" ")
key = splited[0].strip()
date = splited[1].strip()
tem = splited[2].strip()
temp = int(tem[0:2])
abstemp = abs(25 - temp)
if temperature.get(key) == None:
temperature.update({key:[date,temp,abstemp]})
else:
temp_last = temperature.get(key)[2]
if temp_last > abstemp:
temperature.update({key:[date,temp,abstemp]})
for key in temperature:
print(" ".join([key,str(temperature.get(key)[0]),str(temperature.get(key)[1])]))
if __name__ == "__main__":
reduce()
I have no idea what is the problem and the hadoop configuration should be right because I am using docker that should be well setted.