Friday 27 December 2019

PROCEDURE TO INSTALL HADOOP/SPARK ON WINDOWS7/10

PROCEDURE TO INSTALL HADOOP/SPARK ON WINDOWS7/10

Spark&Hadoop 4 Windows


1) Download Spark 2.4-bin-hadoop2.7.tgz file from Spark.apachi.org/downloads.html

2) Download Winutils.exe or Winutils-master directory
  Link:- Hadoop for windows google it.
3) Create two folder in C:\                
                 A) spark
                 B) winutils
                 C) winutils\bin will have winutils file of hadoop2.7

4) Set Environment Variables for User variable
5) Download the  HDP OVA file from Cloudera site
    & import it into oracle Oracle VirtualBox with 16GB RAM

name=hadoop_home
value=C:\wiutils
---------------------
name=Spark_Home
value=C:\spark
Path =;%Spark_Home%\bint
------------------------------------

Finally  Run  the command C:\spark>Spark-Shell press Enter.

PySpare Example:- file name = ABC.py
C:\spark>pyspark
from pyspark.sql import SQLContext
sqlContext=SQLContext(sc)
df = sqlContext.read.format("csv").option("header", "true").load("c:/spark/data.csv")
df.show()
#df.select("Appeared")
#df.filter(df["Appeared"]==1991).show()
df.printSchema()
 --------------------------------------------------------------------------------------------------------------

New way of working on Spark Python

from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("SparkSQL").getOrCreate()
df=spark.read.format("jdbc").options(url="jdbc:mysql://127.0.0.1:3306/db_for_hadoop",driver ="com.mysql.jdbc.Driver",dbtable="db_for_hadoop.testtable_for_spark",user="root",password="test").load()
df.show()
df.registerTempTable("mytemptable")
sqlContext=SQLContext(spark)
sqlContext.sql("select * from mytemptable where Appeared_Year in(1991,1995)").show()

C:\SparkCourse>spark-submit ABC.py
 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+--------------------+-----------------+-------------+--------+
|Pro language|      Designed by|Appeared_Year|File_Ext|
+--------------------+-----------------+-------------+--------+
|          Python| Guido van Rossum|      1991|        .py|
|              Java|  James Gosling      |      1995|      .java|
|              C++| Bjarne Stroustrup  |      1983|       .cpp|
+--------------------+-----------------+-------------+--------+

+--------------------+----------------+-------------+--------+
|Programming lang| Designed by|Appeared_Year|File_Ext|
+--------------------+----------------+-------------+--------+
|              Python|Guido van Rossum|         1991|     .py|
|                Java|         James Gosling|         1995|   .java|
+--------------------+----------------+-------------+--------+


Installing Spark and Python on Windows:
Install a JDK (Java Development Kit) from http://www.oracle.com/technetwork/java/javase/downloads/index.html . You must install the JDK into a path with no spaces, for example c:\jdk. Be sure to change the default location for the installation INSTALL JAVA 8. .
Download a pre-built version of Apache Spark 3 from https://spark.apache.org/downloads.html
C:\Users\any username\App Data\Local\Programs\Python\Python37\Lib\site-packages
Extract the Spark archive, and copy its contents into C:\spark after creating that directory. You should end up with directories like c:\spark\bin, c:\spark\conf, etc.
Download winutils.exe  move it into a C:\winutils\bin folder that you’ve created. (note, this is a 64-bit application.
Create a c:\tmp\hive directory, and cd into c:\winutils\bin, and run winutils.exe chmod 777 c:\tmp\hive
Open the the c:\spark\conf folder, and make sure “File Name Extensions” is checked in the “view” tab of Windows Explorer. Rename the log4j.properties.template file to log4j.properties. Edit this file (using Wordpad or something similar) and change the error level from INFO to ERROR for log4j.rootCategory
Right-click your Windows menu, select Control Panel, System and Security, and then System. Click on “Advanced System Settings” and then the “Environment Variables” button.
Add the following new USER variables:
SPARK_HOME c:\spark
JAVA_HOME (the path you installed the JDK to in step 1, for example C:\JDK)
HADOOP_HOME c:\winutils
Add the following paths to your PATH user variable:

%SPARK_HOME%\bin

%JAVA_HOME%\bin

Close the environment variable screen and the control panels.
Install the latest Anaconda for Python 3 from anaconda.com. Don’t install a Python 2.7 version! If you already use some other Python environment, that’s OK – you can use it instead, as long as it is a Python 3 environment.
 How to install spyder-4.0.1
D:\python\spyder-4.0.1\spyder-4.0.1> pip3 install spyder  (command)

#Example related to rdds

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("abc")
sc = SparkContext(conf = conf)
#---------------------
#conf=SparkConf().setMaster("local[*]").setAppName("abc_2")
#
l = sc.textFile("file:///1/2/1.data")
m= l.map(lambda x: (int(x.split()[1]), 1))
mc = m.reduceByKey(lambda x, y: x + y)
f = mc.map( lambda xy: (xy[1],xy[0]) )
sm = flipped.sortByKey()
results = sm.collect()
for res in reslts:
    print(reslts)
#presist() and #cache() before calling the action, to keep the RDD available.
-------------------------------------------------------------------------------------------------------------------------
Spark Streaming sample Example
--------------------------------------------------------------------------------------------------------------------------
4.175.197.170 - - [29/Dec/2016:05:32:30 +0000] "GET /sitemap_index.xml HTTP/1.0" 200 592 "-" "W3 Total Cache/0.9.4.1"
4.175.197.170 - - [29/Dec/2016:05:32:30 +0000] "GET /post-sitemap.xml HTTP/1.0" 200 2502 "-" "W3 Total Cache/0.9.4.1"
4.175.197.170 - - [29/Dec/2016:05:32:30 +0000] "GET /page-sitemap.xml HTTP/1.0" 200 11462 "-" "W3 Total Cache/0.9.4.1"
4.175.197.170 - - [29/Dec/2016:05:32:30 +0000] "GET /category-sitemap.xml HTTP/1.0" 200 585 "-" "W3 Total Cache/0.9.4.1"
-------------------------------------------------------------------------------------------------------------------------
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import regexp_extract
--make a SparkSession
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("StructuredStreaming").getOrCreate()
# lookingfor the logs directory for new log data, and read in the raw lines as accessLines
Lines = spark.readStream.text("logs")
--search out the common log format to a df
contentSizeExp = r'\s(\d+)$'
statusExp = r'\s(\d{3})\s'
generalExp = r'\"(\S+)\s(\S+)\s*(\S*)\"'
timeExp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
hostExp = r'(^\S+\.[\S+\.]+\S+)\s'

logs = Lines.select(regexp_extract('value', hostExp, 1).alias('host'),
                         regexp_extract('value', timeExp, 1).alias('timestamp'),
                         regexp_extract('value', generalExp, 1).alias('method'),
                         regexp_extract('value', generalExp, 2).alias('endpoint'),
                         regexp_extract('value', generalExp, 3).alias('protocol'),
                         regexp_extract('value', statusExp, 1).cast('integer').alias('status'),
                         regexp_extract('value', contentSizeExp, 1).cast('integer').alias('content_size'))
-- running count of every access by status code
statusCountsDF = logs.groupBy(logsDF.status).count()

-- start streaming query, results to the screen
query = ( statusCounts.writeStream.outputMode("complete").format("console").queryName("counts").start() )
query.awaitTermination()
spark.stop()
----------------------------------------------------------------------------------------------------------------------------
0.00,-0.07
-1.30,1.45
1.11,-1.16
-0.23,0.29
-0.14,0.13
-----------------------------------------------------------------------------------------------------------------------------

from __future__ import print_function
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
if __name__ == "__main__":
    spark = SparkSession.builder.config("spark.sql.wh.dr", "file:///F:/tmp").appName("LR").getOrCreate()
    # Load up our data and convert it to the format MLLib needed.
    inputLines = spark.sparkContext.textFile("regression.txt")
    data = inputLines.map(lambda x: x.split(",")).map(lambda x: (float(x[0]), Vectors.dense(float(x[1]))))
    # Convert this RDD to a DataFrame
    colNames = ["label", "features"]
    df = data.toDF(colNames)
    # Let's split our data into training data and testing data
    trainTest = df.randomSplit([0.058, 0.058])
    trainingDF = trainTest[0]
    testDF = trainTest[1]
    # Now create our linear regression model
    lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
    # make the model by our training data
    model = lir.fit(trainingDF)
    # test df:
    fPs = model.transform(testDF).cache()
    # takeout the predictions and the "known" correct labels.
    predictions = fPs.select("prediction").rdd.map(lambda x: x[0])
    labels = fullPredictions.select("label").rdd.map(lambda x: x[0])
    # merge them together
    predictionAndLabel = predictions.zip(labels).collect()
    # forcasted and actual values for each point
    for prediction in predictionAndLabel:
      print(prediction)
    spark.stop()
----------------------------------------------------------------------------------------------------------------------
   Hadoop definition as follows
A. Open Source
B. Collection of software that runs on cluster of computers
Uses concept following concepts
1) Distributed Storage mean add and remove computer (nodes) from cluster of computers
2) Distributed processing of large data set in parallel manner
3) Commodity hardware means readily available


 Other Supported Technologies for Hadoop
HBASE act as an interface for exposing your data which reside in Hadoop cluster to other Transaction Processing Systems.Hbase is a fast columnar data store for your data.
OoZie is  for scheduling jobs/task on your Hadoop computer cluster Environment.
ZooKeeper is for keep track of activities on your cluster environment such as which node is up and which is down and which computer will become Master node in Hadoop cluster.
Sqoop(SQL to Hadoop) is used for takeup to your data from TP DB such relation databases
Sqoop take to ODBC, JDBC and RDBMS system to get data into Hadoop cluster environment
Storm/Spark Streaming is used for real time data processing in Hadoop environment.

HDFS Concept in Linux

[tahir_user@sandbox  ~]$ Hadoop fs  -ls
[tahir_user@sandbox  ~]$ Hadoop fs –mkdir  Tahir
[tahir_user@sandbox  ~]$ Wget http://localhost/hadoop/Tahir/mydata.data
[tahir_user@sandbox  ~]$hadoop fs –copyFromLocal mydata.data Tahir/mydata.data
[tahir_user@sandbox  ~]$hadoop fs –ls Tahir
[tahir_user@sandbox  ~]$hadoop  fs –rm Tahir/mydata.data
[tahir_user@sandbox  ~]$hadoop fs –rm  Tahir/mydata.data

Map and Reduce Concept on Excel










python 1.py -r hadoop --hadoop-streaming-jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar yourdata.data
python 1.py yourdata.data
from mrjob.job import MRJob
from mrjob.step import MRStep
class Books(MRJob):
    def steps(self):
        return [MRStep(mapper=self.mapper_get_BooksRating,reducer=self.reducer_count_BooksRating)]

    def mapper_get_BooksRating(self, _, line):
        (Student_id, Book_ID, Book_Rating, Buying_Time) = line.split('\t')
        yield Book_Rating, 1

    def reducer_count_BooksRating(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
       Books.run()

Importing data from MySQL to Hadoop using sqoop tool 

mysql -u root -p
GRANT ALL PRIVILEGES ON YourDataBase.* to ''@'localhost'
sqoop import --connect jdbc:mysql://localhost/DB --driver com.mysql.jdbc.Driver --table YourTable -m 1

sqoop export --connect jdbc:mysql://localhost/DB -m1 --driver com.mysql.jdbc.Driver --table YourTable --export-dir /apps/hive/warehouse/movie -- input-fields-terminate-by '/0001'



About Hbase  Hbase Written in Java.
Hbase we can  query like below
What is the value for this given Key Or Store the Value for this key in Hbase
Operation in Hbase are
CRUD=Create, Read , Update,Delete are the API in Hbase for controlling the database
Hbase has Ranges of Key talk to Regions Server
About REST SERVICE
REST Service built in Hbase
Mean to get service results through Http Requests
BookRating will be your Colum family in HBASE,
Give me all the Book Rating for a give  Reder_ID  bought my a  particular Reader
Each Reder_ID will have a Book Rating Colum Family
 6 star
 
 1 star
 
Reader_ID
 
BookRating:BookID=44                                                                                                                                                                                       BookRating:BookID=200 
 

PythonClinet Program with communicate with REST service , and on the way 
The REST SERVICE will negociate with HBASE and further more
HBASE will talks will HDFS System
/usr/hdp/current/hbase-master/bin/hbase-daemon.sh start rest -p 8000 -infoport 8001
/usr/hdp/current/hbase-master/bin/hbase-daemon.sh stop rest
[root@sandbox maria_dev]# hbase shell
hbase(main):001:0>scan My_Users
hbase(main):003:0> create 'My_Users','MyUser_Detail'
Create a table name My_User having Column family MyUser_Detail
=> Hbase::Table - My_Users
hbase(main):001:0> list
TABLE
My_Users
 

Hadoop OOZIE 
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.1" name="oo-mm">
    <start to="sqoop-node"/>
    <action name="sqoop-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/maria_dev/Yourtable"/>
            </prepare>

            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <command>import --connect jdbc:mysql://localhost/DB --driver com.mysql.jdbc.Driver --table Yourtable -m 1</command>
        </sqoop>
        <ok to="hive-node"/>
        <error to="fail"/>
    </action>

    <action name="hive-node">
        <hive xmlns="uri:oozie:hive-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/maria_dev/OF"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <script>OF.sql</script>
            <param>OUTPUT=/user/maria_dev/OF</param>
        </hive>
        <ok to="end"/>
        <error to="fail"/>
    </action>
<kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

--------------------------------------------------------------------------------------
nameNode=hdfs://sandbox.hortonworks.com:8010
jobTracker=http://sandbox.hortonworks.com:8040
queueName=default
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/Tahir_dev
job.properties (END)
------------------------------------------------------------------------------------------------------
wget http://localhost/hadoop/1.sql
wget http://localhost/hadoop/job.properties
[Tahir_dev@sandbox ~]$ hadoop fs -put wf.xml  /user/maria_dev
[Tahir_dev@sandbox ~]$ hadoop fs -put 1.sql  /user/maria_dev
[Tahir_dev@sandbox ~]$ hadoop fs -put /usr/share/java/mysql-connector-java.jar / /user/oozie/share/lib/lib_20171025075233/sqoop
[Tahir_dev@sandbox ~]$ oozie job -oozie http://localhost:13000/oozie -config /home/maria_dev/job.properties --run
------------------------------------------------------------------------
---combine textfiles--batchfile
@ECHO OFF
SET first=y
SET newfile=new.csv
for %%F in (*.csv) do IF NOT %%F==%newfile% (
  if defined first (
    COPY /y "%%F" %newfile% >nul
    set "first="
  ) else (
    FOR /f "skip=1delims=" %%i IN (%%F) DO >> %newfile% ECHO %%i
  )
)
--------------------------------------------
@ECHO  OFF
set msg="TAHIR"
for %%x in (*.csv) do (
ECHO  %msg%
)
pause

FOR /L %%A IN (1,1,200) DO (
  ECHO %%A
)

@echo off
SET var1="Yes"
SET var2="No"
SET var3="Yes"
if %var1%=="Yes"
    echo Var1 set
if %var2%=="Yes"
    echo Var2 set
if %var3%=="Yes"
    echo Var3 set
-------------------------------
--------------------------------------------------------------------------
kafka producer
----------------------------------------------------------------------------
[maria_dev@sandbox ~]$ cd /usr/hdp/current/kafka-broker/
[maria_dev@sandbox bin]$ ./kafka-topics.sh  --create --zookeeper sandbox.hortonworks.com:2218 --replication-factor 1 --partitions 1 --topic tahir
Created topic "tahir".
[maria_dev@sandbox bin]$ ./kafka-topics.sh  --list --zookeeper sandbox.hortonworks.com:2218
[maria_dev@sandbox bin]$ ./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:9997 --topic tahir
I am mr tahir sending some data to the topic tahir
this is tahir data

------------------------------------------------------------------------------
kafka consumer
-------------------------------------------------------------------------
[maria_dev@sandbox ~]$ cd /usr/hdp/current/kafka-broker/bin
[maria_dev@sandbox bin]$ ./kafka-console-consumer.sh --bootstrap-server sandbox.hortonworks.com:9997 --zookeeper localhost:2181 --topic tahir --from-beginning

-----------
Kafka and Flume are used as streaming technologies
--------------------------------------------------------------------------------------
To configure the Kafka Server We need to Edit 3 file in HW framework
--------------------------------------------------------------------------------------------------------------
1) connect-standalone.properties change(bootstrap.servers=sandbox.hw.com:port)
--This file result will store in this file
 2)connect-file-sink.properties chang (file=/home/Tahir_dev/logout.txt and topic=yourlog_test)
--This file listen to
3)connect-file-source.properties chang(file=/home/Tahir_dev/access and topic=yourlog_test)

------------------------------------------
Setup Our Consumer
------------------------------------------
Step 1 Run the file ./ Kafka-console-Consumer.sh with 3 Parameters
------------------------------------------
To Kich off the Connector
-------------------------------------------
cd /usr/hwp/current/kafka.../bin/
then run the file
./connect-standaloan.sh~
--------------------------------------------------------------------
Popular  Query Engin for Hadoop system
1) Hue
2)Drill
3)Presto
----------------------------------------------------------
Flume Agent has 3 concepts
1)Source
2)Channel
3)Sink
Flume live in usr/hdp/current/flume-server/
$bin/flume-ng agent --config