<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    Spark + Canal + Kafka打造Mysql增量數據實時進行監測分析

    VSole2022-08-04 17:03:03

    Spark中的Spark Streaming可以用于實時流項目的開發,實時流項目的數據源除了可以來源于日志、文件、網絡端口等,常常也有這種需求,那就是實時分析處理MySQL中的增量數據。

    面對這種需求當然我們可以通過JDBC的方式定時查詢Mysql,然后再對查詢到的數據進行處理也能得到預期的結果,但是Mysql往往還有其他業務也在使用,這些業務往往比較重要,通過JDBC方式頻繁查詢會對Mysql造成大量無形的壓力,甚至可能會影響正常業務的使用,在基本不影響其他Mysql正常使用的情況下完成對增量數據的處理,那就需要Canal了。

    假設Mysql中canal_test庫下有一張表policy_cred,需要統計實時統計policy_status狀態為1的mor_rate的的變化趨勢,并標注比率的風險預警等級。

     1. Canal

    Canal [k?'n?l] 是阿里巴巴開源的純java開發的基于數據庫binlog的增量訂閱&消費組件。Canal的原理是模擬為一個Mysql slave的交互協議,偽裝自己為MySQL slave,向Mysql Master發送dump協議,然后Mysql master接收到這個請求后將binary log推送給slave(也就是Canal),Canal解析binary log對象。

    關于Canal,建議閱讀:監聽MySQL的binlog日志工具分析:Canal、Maxwell

    1.1 Canal 安裝

    Canal的server mode在1.1.x版本支持的有TPC、Kafka、RocketMQ。本次安裝的canal版本為1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ選用Kafka。服務器系統為Centos7,其他環境為:jdk8、Scala 2.11、Mysql、Zookeeper、Kafka。

    1.1.1 準備

    安裝Canal之前我們先把如下安裝好

    Mysql

    a.如果沒有Mysql: 詳細的安裝過程可參考我的另一篇博客[Centos7環境下離線安裝mysql 5.7 / mysql 8.0]

    b.開啟Mysql的binlog。修改/etc/my.cnf,在[mysqld]下添加如下配置,改完之后重啟 Mysql/etc/init.d/mysql restart

    [mysqld]
    #添加這一行就ok
    log-bin=mysql-bin
    #選擇row模式
    binlog-format=ROW
    #配置mysql replaction需要定義,不能和canal的slaveId重復
    server_id=1
    

    c. 創建一個Mysql用戶并賦予相應權限,用于Canal使用

    mysql>  CREATE USER canal IDENTIFIED BY 'canal';  
    mysql>  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    mysql>  GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    mysql>  FLUSH PRIVILEGES;
    

    Zookeeper

    因為安裝Kafka時需要Zookeeper,例如ZK安裝后地址為:cdh3:2181,cdh4:2181,cdh5:2181

    Kafka

    例如安裝后的地址為:node1:9092,node2:9092,node3:9092 安裝后創建一個Topic,例如創建一個 example

    kafka-topics.sh --create --zookeeper cdh3:2181,cdh4:2181,cdh5:2181 --partitions 2 --replication-factor 1 --topic example
    

    1.1.2 安裝Canal

    1.下載Canal

    訪問Canal的Release頁 canal v1.1.2 wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz

    2.解壓

    注意 這里一定要先創建出一個目錄,直接解壓會覆蓋文件 mkdir -p /usr/local/canal mv canal.deployer-1.1.2.tar.gz /usr/local/canal/ tar -zxvf canal.deployer-1.1.2.tar.gz

    3.修改instance 配置文件

    vim $CANAL_HOME/conf/example/instance.properties,修改如下項,其他默認即可

    ## mysql serverId , v1.0.26+ will autoGen , 不要和server_id重復
    canal.instance.mysql.slaveId=3
    # position info。Mysql的url
    canal.instance.master.address=node1:3306
    # table meta tsdb info
    canal.instance.tsdb.enable=false
    # 這里配置前面在Mysql分配的用戶名和密碼
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset=UTF-8
    # 配置需要檢測的庫名,可以不配置,這里只檢測canal_test庫
    canal.instance.defaultDatabaseName=canal_test
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    # 配置過濾的正則表達式,監測canal_test庫下的所有表
    canal.instance.filter.regex=canal_test\\..*
    # 配置MQ
    ## 配置上在Kafka創建的那個Topic名字
    canal.mq.topic=example
    ## 配置分區編號為1
    canal.mq.partition=1
    

    4.修改canal.properties配置文件

    vim $CANAL_HOME/conf/canal.properties,修改如下項,其他默認即可

    # 這個是如果開啟的是tcp模式,會占用這個11111端口,canal客戶端通過這個端口獲取數據
    canal.port = 11111
    # 可以配置為:tcp, kafka, RocketMQ,這里配置為kafka
    canal.serverMode = kafka
    # 這里將這個注釋掉,否則啟動會有一個警告
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
    ##################################################
    #########              MQ              #############
    ##################################################
    canal.mq.servers = node1:9092,node2:9092,node3:9092
    canal.mq.retries = 0
    canal.mq.batchSize = 16384
    canal.mq.maxRequestSize = 1048576
    canal.mq.lingerMs = 1
    canal.mq.bufferMemory = 33554432
    Canal的batch size, 默認50K, 由于kafka最大消息體限制請勿超過1M(900K以下)
    canal.mq.canalBatchSize = 50
    # Canal get數據的超時時間, 單位: 毫秒, 空為不限超時
    canal.mq.canalGetTimeout = 100
    # 是否為flat json格式對象
    canal.mq.flatMessage = true
    canal.mq.compressionType = none
    canal.mq.acks = all
    # kafka消息投遞是否使用事務
    #canal.mq.transaction = false
    

    5.啟動Canal

    $CANAL_HOME/bin/startup.sh
    

    6.驗證

    查看日志:啟動后會在logs下生成兩個日志文件:logs/canal/canal.log、logs/example/example.log,查看這兩個日志,保證沒有報錯日志。

    如果是在虛擬機安裝,最好給2個核數以上。

    確保登陸的系統的hostname可以ping通。

    在Mysql數據庫中進行增刪改查的操作,然后查看Kafka的topic為example的數據:

    kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic example
    

    7.關閉Canal

    不用的時候一定要通過這個命令關閉,如果是用kill或者關機,當再次啟動依然會提示要先執行stop.sh腳本后才能再啟動。

    $CANAL_HOME/bin/stop.sh
    

    1.2 Canal客戶端代碼

    如果我們不使用Kafka作為Canal客戶端,我們也可以用代碼編寫自己的Canal客戶端,然后在代碼中指定我們的數據去向。此時只需要將canal.properties配置文件中的canal.serverMode值改為tcp。編寫我們的客戶端代碼。在Maven項目的pom中引入:

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.2</version>
    </dependency>
    

    編寫代碼:

    /**
     * Canal客戶端。
     *  注意:canal服務端只會連接一個客戶端,當啟用多個客戶端時,其他客戶端是就無法獲取到數據。所以啟動一個實例即可
     * @see <a >官方文檔:ClientSample代碼</a>
     */
    public class SimpleCanalClientExample {
        public static void main(String args[]) {
            /**
             * 創建鏈接
             *      SocketAddress: 如果提交到canal服務端所在的服務器上運行這里可以改為 new InetSocketAddress(AddressUtils.getHostIp(), 11111)
             *      destination 通服務端canal.properties中的canal.destinations = example配置對應
             *      username:
             *      password:
             */
            CanalConnector connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress("node1", 11111),
                    "example", "", "");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                        printEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交確認
                    // connector.rollback(batchId); // 處理失敗, 回滾數據
                }
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
        private static void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
                EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
                /**
                 * 如果只對某些庫的數據操作,可以加如下判斷:
                 * if("庫名".equals(entry.getHeader().getSchemaName())){
                 *      //TODO option
                 *  }
                 * 
                 * 如果只對某些表的數據變動操作,可以加如下判斷:
                 * if("表名".equals(entry.getHeader().getTableName())){
                 *     //todo option
                 * }
                 * 
                 */
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------&gt; before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------&gt; after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }
    

    本地運行上述代碼,我們修改Mysql數據中的數據,可在控制臺中看到數據的改變:

    empty count : 20
    empty count : 21
    empty count : 22
    ================&gt; binlog[mysql-bin.000009:1510] , name[canal_test,customer] , eventType : INSERT
    id : 4    update=true
    name : spark    update=true
    empty count : 1
    empty count : 2
    empty count : 3
    

     2. Spark

    通過上一步我們已經能夠獲取到 canal_test 庫的變化數據,并且已經可將將變化的數據實時推送到Kafka中,Kafka中接收到的數據是一條Json格式的數據,我們需要對 INSERT 和 UPDATE 類型的數據處理,并且只處理狀態為1的數據,然后需要計算 mor_rate 的變化,并判斷 mor_rate 的風險等級,0-75%為G1等級,75%-80%為R1等級,80%-100%為R2等級。最后將處理的結果保存到DB,可以保存到Redis、Mysql、MongoDB,或者推送到Kafka都可以。這里是將結果數據保存到了Mysql。

    2.1 在Mysql中創建如下兩張表

    -- 在canal_test庫下創建表
    CREATE TABLE `policy_cred` (
        p_num varchar(22) NOT NULL,
        policy_status varchar(2) DEFAULT NULL COMMENT '狀態:0、1',
        mor_rate decimal(20,4) DEFAULT NULL,
        load_time datetime DEFAULT NULL,
        PRIMARY KEY (`p_num`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    -- 在real_result庫下創建表
    CREATE TABLE `real_risk` (
        p_num varchar(22) NOT NULL,
        risk_rank varchar(8) DEFAULT NULL COMMENT '等級:G1、R1、R2',
        mor_rate decimal(20,4) ,
        ch_mor_rate decimal(20,4),
        load_time datetime DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

    2.2 Spark代碼開發

    2.2.1 在resources下new一個項目的配置文件my.properties

    ## spark
    # spark://cdh3:7077
    spark.master=local[2]
    spark.app.name=m_policy_credit_app
    spark.streaming.durations.sec=10
    spark.checkout.dir=src/main/resources/checkpoint
    ## Kafka
    bootstrap.servers=node1:9092,node2:9092,node3:9092
    group.id=m_policy_credit_gid
    # latest, earliest, none
    auto.offset.reset=latest
    enable.auto.commit=false
    kafka.topic.name=example
    ## Mysql
    mysql.jdbc.driver=com.mysql.jdbc.Driver
    mysql.db.url=jdbc:mysql://node1:3306/real_result
    mysql.user=root
    mysql.password=123456
    mysql.connection.pool.size=10
    2.2.2 在pom.xml文件中引入如下依賴
    <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <scala.version>2.11.8</scala.version>
            <spark.version>2.4.0</spark.version>
            <canal.client.version>1.1.2</canal.client.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>${canal.client.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>io.netty</groupId>
                        <artifactId>netty-all</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <!-- Spark -->
            <!-- spark-core -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- spark-streaming -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- spark-streaming-kafka -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- spark-sql -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.6.1</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.51</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.47</version>
            </dependency>
        </dependencies>
    

    2.2.3 在scala源碼目錄下的包下編寫配置文件的工具類

    package yore.spark
    import java.util.Properties
    /**
      * Properties的工具類
      *
      * Created by yore on 2018-06-29 14:05
      */
    object PropertiesUtil {
      private val properties: Properties = new Properties
      /**
        *
        * 獲取配置文件Properties對象
        *
        * @author yore
        * @return java.util.Properties
        */
      def getProperties() :Properties = {
        if(properties.isEmpty){
          //讀取源碼中resource文件夾下的my.properties配置文件
          val reader = getClass.getResourceAsStream("/my.properties")
          properties.load(reader)
        }
        properties
      }
      /**
        *
        * 獲取配置文件中key對應的字符串值
        *
        * @author yore
        * @return java.util.Properties
        */
      def getPropString(key : String) : String = {
        getProperties().getProperty(key)
      }
      /**
        *
        * 獲取配置文件中key對應的整數值
        *
        * @author yore
    */
    def getPropInt(key : String) : Int = {
        getProperties().getProperty(key).toInt
      }
      /**
        *
        * 獲取配置文件中key對應的布爾值
        *
        * @return java.util.Properties
        */
      def getPropBoolean(key : String) : Boolean = {
        getProperties().getProperty(key).toBoolean
      }
    }
    

    2.2.4 在scala源碼目錄下的包下編寫數據庫操作的工具類

    package yore.spark
    import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, SQLException}
    import java.util.concurrent.LinkedBlockingDeque
    import scala.collection.mutable.ListBuffer
    /**
      *
      * Created by yore on 2018/11/14 20:34
      */
    object JDBCWrapper {
      private var jdbcInstance : JDBCWrapper = _
      def getInstance() : JDBCWrapper = {
        synchronized{
          if(jdbcInstance == null){
            jdbcInstance = new JDBCWrapper()
          }
        }
        jdbcInstance
      }
    }
    class JDBCWrapper {
      // 連接池的大小
      val POOL_SIZE : Int = PropertiesUtil.getPropInt("mysql.connection.pool.size")
      val dbConnectionPool = new LinkedBlockingDeque[Connection](POOL_SIZE)
      try
        Class.forName(PropertiesUtil.getPropString("mysql.jdbc.driver"))
      catch {
        case e: ClassNotFoundException => e.printStackTrace()
      }
      for(i <- 0 until POOL_SIZE){
        try{
          val conn = DriverManager.getConnection(
            PropertiesUtil.getPropString("mysql.db.url"),
            PropertiesUtil.getPropString("mysql.user"),
            PropertiesUtil.getPropString("mysql.password"));
          dbConnectionPool.put(conn)
        }catch {
          case e : Exception => e.printStackTrace()
        }
      }
      def getConnection(): Connection = synchronized{
        while (0 == dbConnectionPool.size()){
          try{
            Thread.sleep(20)
          }catch {
            case e : InterruptedException => e.printStackTrace()
          }
        }
        dbConnectionPool.poll()
      }
      /**
        * 批量插入
        *
        * @param sqlText    sql語句字符
        * @param paramsList 參數列表
        * @return Array[Int]
        */
      def doBatch(sqlText: String, paramsList: ListBuffer[ParamsList]): Array[Int] = {
        val conn: Connection = getConnection()
        var ps: PreparedStatement = null
        var result: Array[Int] = null
        try{
          conn.setAutoCommit(false)
          ps = conn.prepareStatement(sqlText)
          for (paramters <- paramsList) {
            paramters.params_Type match {
              case "real_risk" => {
                println("$$$\treal_risk\t" + paramsList)
                // // p_num, risk_rank, mor_rate, ch_mor_rate, load_time
                ps.setObject(1, paramters.p_num)
                ps.setObject(2, paramters.risk_rank)
                ps.setObject(3, paramters.mor_rate)
                ps.setObject(4, paramters.ch_mor_rate)
                ps.setObject(5, paramters.load_time)
              }
            }
            ps.addBatch()
          }
          result = ps.executeBatch
          conn.commit()
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (ps != null) try {
            ps.close()
          } catch {
            case e: SQLException => e.printStackTrace()
          }
          if (conn != null) try {
            dbConnectionPool.put(conn)
          } catch {
            case e: InterruptedException => e.printStackTrace()
          }
        }
        result
      }
    }
    

    2.2.5 在scala源碼目錄下的包下編寫Spark程序代碼

    package yore.spark
    import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scala.collection.mutable.ListBuffer
    /**
      *
      * Created by yore on 2019/3/16 15:11
      */
    object M_PolicyCreditApp {
      def main(args: Array[String]): Unit = {
        // 設置日志的輸出級別
        Logger.getLogger("org").setLevel(Level.ERROR)
        val conf = new SparkConf()
          .setMaster(PropertiesUtil.getPropString("spark.master"))
          .setAppName(PropertiesUtil.getPropString("spark.app.name"))
          // !!必須設置,否則Kafka數據會報無法序列化的錯誤
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        //如果環境中已經配置HADOOP_HOME則可以不用設置hadoop.home.dir
        System.setProperty("hadoop.home.dir", "/Users/yoreyuan/soft/hadoop-2.9.2")
        val ssc = new StreamingContext(conf,  Seconds(PropertiesUtil.getPropInt("spark.streaming.durations.sec").toLong))
        ssc.sparkContext.setLogLevel("ERROR")
        ssc.checkpoint(PropertiesUtil.getPropString("spark.checkout.dir"))
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> PropertiesUtil.getPropString("bootstrap.servers"),
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> PropertiesUtil.getPropString("group.id"),
          "auto.offset.reset" -> PropertiesUtil.getPropString("auto.offset.reset"),
          "enable.auto.commit" -> (PropertiesUtil.getPropBoolean("enable.auto.commit"): java.lang.Boolean)
        )
        val topics = Array(PropertiesUtil.getPropString("kafka.topic.name"))
        val kafkaStreaming = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
        kafkaStreaming.map[JSONObject](line => { // str轉成JSONObject
          println("$$$\t" + line.value())
          JSON.parseObject(line.value)
        }).filter(jsonObj =>{   // 過濾掉非 INSERT和UPDATE的數據
          if(null == jsonObj || !"canal_test".equals(jsonObj.getString("database")) ){
            false
          }else{
            val chType = jsonObj.getString("type")
            if("INSERT".equals(chType) || "UPDATE".equals(chType)){
              true
            }else{
              false
            }
          }
        }).flatMap[(JSONObject, JSONObject)](jsonObj => {   // 將改變前和改變后的數據轉成Tuple
          var oldJsonArr: JSONArray = jsonObj.getJSONArray("old")
          val dataJsonArr: JSONArray = jsonObj.getJSONArray("data")
          if("INSERT".equals(jsonObj.getString("type"))){
            oldJsonArr = new JSONArray()
            val oldJsonObj2 = new JSONObject()
            oldJsonObj2.put("mor_rate", "0")
            oldJsonArr.add(oldJsonObj2)
          }
          val result = ListBuffer[(JSONObject, JSONObject)]()
          for(i <- 0 until oldJsonArr.size ) {
            val jsonTuple = (oldJsonArr.getJSONObject(i), dataJsonArr.getJSONObject(i))
            result += jsonTuple
          }
          result
        }).filter(t => {  // 過濾狀態不為1的數據,和mor_rate沒有改變的數據
          val policyStatus = t._2.getString("policy_status")
          if(null != policyStatus && "1".equals(policyStatus) && null!= t._1.getString("mor_rate")){
            true
          }else{
            false
          }
        }).map(t => {
          val p_num = t._2.getString("p_num")
          val nowMorRate = t._2.getString("mor_rate").toDouble
          val chMorRate = nowMorRate - t._1.getDouble("mor_rate")
          val riskRank = gainRiskRank(nowMorRate)
          // p_num, risk_rank, mor_rate, ch_mor_rate, load_time
          (p_num, riskRank, nowMorRate, chMorRate, new java.util.Date)
        }).foreachRDD(rdd => {
          rdd.foreachPartition(p => {
            val paramsList = ListBuffer[ParamsList]()
            val jdbcWrapper = JDBCWrapper.getInstance()
            while (p.hasNext){
              val record = p.next()
              val paramsListTmp = new ParamsList
              paramsListTmp.p_num = record._1
              paramsListTmp.risk_rank = record._2
              paramsListTmp.mor_rate = record._3
              paramsListTmp.ch_mor_rate = record._4
              paramsListTmp.load_time = record._5
              paramsListTmp.params_Type = "real_risk"
              paramsList += paramsListTmp
            }
            /**
              * VALUES(p_num, risk_rank, mor_rate, ch_mor_rate, load_time)
              */
            val insertNum = jdbcWrapper.doBatch("INSERT INTO real_risk VALUES(?,?,?,?,?)", paramsList)
            println("INSERT TABLE real_risk: " + insertNum.mkString(", "))
          })
        })
        ssc.start()
        ssc.awaitTermination()
      }
      def gainRiskRank(rate: Double): String = {
        var result = ""
        if(rate>=0.75 && rate<0.8){
          result = "R1"
        }else if(rate >=0.80 && rate<=1){
          result = "R2"
        }else{
          result = "G1"
        }
        result
      }
    }
    /**
      * 結果表對應的參數實體對象
      */
    class ParamsList extends Serializable{
      var p_num: String = _
      var risk_rank: String = _
      var mor_rate: Double = _
      var ch_mor_rate: Double = _
      var load_time:java.util.Date = _
      var params_Type : String = _
      override def toString = s"ParamsList($p_num, $risk_rank, $mor_rate, $ch_mor_rate, $load_time)"
    }
     3.測試 
    

    啟動ZK、Kafka、Canal。在canal_test庫下的policy_cred表中插入或者修改數據,然后查看real_result庫下的real_risk表中結果。

    更新一條數據時Kafka接收到的json數據如下(這是canal投送到Kafka中的數據格式,包含原始數據、修改后的數據、庫名、表名等信息):

    {
      "data": [
        {
          "p_num": "1",
          "policy_status": "1",
          "mor_rate": "0.8800",
          "load_time": "2019-03-17 12:54:57"
        }
      ],
      "database": "canal_test",
      "es": 1552698141000,
      "id": 10,
      "isDdl": false,
      "mysqlType": {
        "p_num": "varchar(22)",
        "policy_status": "varchar(2)",
        "mor_rate": "decimal(20,4)",
        "load_time": "datetime"
      },
      "old": [
        {
          "mor_rate": "0.5500"
        }
      ],
      "sql": "",
      "sqlType": {
        "p_num": 12,
        "policy_status": 12,
        "mor_rate": 3,
        "load_time": 93
      },
      "table": "policy_cred",
      "ts": 1552698141621,
      "type": "UPDATE"
    }
    

    查看Mysql中的結果表:

     4. 出現的問題

    在開發Spark代碼是有時項目可能會引入大量的依賴包,依賴包之間可能就會發生沖突,比如發生如下錯誤:

    Exception in thread "main" java.lang.NoSuchMethodError: io.netty.buffer.PooledByteBufAllocator.<init>(ZIIIIIIIZ)V
        at org.apache.spark.network.util.NettyUtils.createPooledByteBufAllocator(NettyUtils.java:120)
        at org.apache.spark.network.client.TransportClientFactory.<init>(TransportClientFactory.java:106)
        at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:99)
        at org.apache.spark.rpc.netty.NettyRpcEnv.<init>(NettyRpcEnv.scala:71)
        at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:461)
        at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:57)
        at org.apache.spark.SparkEnv$.create(SparkEnv.scala:249)
        at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
        at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
        at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:838)
        at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85)
        at yore.spark.M_PolicyCreditApp$.main(M_PolicyCreditApp.scala:33)
        at yore.spark.M_PolicyCreditApp.main(M_PolicyCreditApp.scala)
    

    我們可以在項目的根目錄下的命令窗口中輸人:mvn dependency:tree -Dverbose> dependency.log

    然后可以在項目根目錄下生產一個dependency.log文件,查看這個文件,在文件中搜索 io.netty 關鍵字,找到其所在的依賴包:

    然就在canal.client將io.netty排除掉:

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>${canal.client.version}</version>
        <exclusions>
            <exclusion>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    
    mysqlcanal
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    假設Mysqlcanal_test庫下有一張表policy_cred,需要統計實時統計policy_status狀態為1的mor_rate的的變化趨勢,并標注比率的風險預警等級。?本次安裝的canal版本為1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ選用Kafka。服務器系統為Centos
    F-vuln(全稱:Find-Vulnerability)是為了自己工作方便專門編寫的一款自動化工具,主要適用于日常安全服務、滲透測試人員和RedTeam紅隊人員,它集合的功能包括:存活IP探測、開放端口探測、web服務探測、web漏洞掃描、smb爆破、ssh爆破、ftp爆破、mssql爆破等其他數據庫爆破工作以及大量web漏洞檢測模塊。
    mysql提權總結
    2021-09-17 15:04:08
    使用過MySQL的人都知道,MySQL有很多內置函數提供給使用者,包括字符串函數、數值函數、日期和時間函數等,給開發人員和使用者帶來了很多方便。
    當我們遇到任意文件讀取漏洞的時候,我們需要考慮如何通過這一個小點去擴大我們的成果,達到最大化利用的目的。本篇文章主要介紹了一些在我們拿到任意文件讀取點后的深入利用姿勢,希望對大家能有所幫助。來源作者lxlxlx@深信服西部天威戰隊常見讀取的敏感文件路徑windowsC:\boot.ini //查看系統版本。當下次訪問相同計算機時,OpenSSH會核對公鑰。如果公鑰不同,OpenSSH會發出警告, 避免你受到DNS Hijack之類的攻擊。
    同時又在默認的系統環境下,通過pip安裝部署,大多數情況下,是不會出現問題。相關的依賴庫,在Python安裝之后,都是通過pip來安裝。但實際我們裝個python2.7版本的環境就基本滿足目前的需求。我們創建了一個叫做py27的虛擬環境。workonpy272.4 安裝Opencananrypip在安裝opencanary時,會自動安裝他所需求要的各種依賴,一般不出問題的話,一切都會順利安裝完成。
    近年來,隨著大數據技術產品的不斷發展和多樣化,各個應用系統也會依據不同的業務場景選擇多個不同的技術組件,數據也隨之散落在各個存儲平臺之中。這種狀況給后續數據分析師在不同數據源之上進行數據的即席關聯查詢和分析帶來了新的難題,本文介紹了在數據不移動的前提下進行多源數據即席訪問的具體探索與實踐。
    網絡上關于ADCS利用各類的學習文檔資源其實也有好些了,不過更多看都是針對RELAY部分,其他幾個利用方式都沒怎么提及特別多,可能確實實用程度沒有那么高,斷斷續續幾個白天晚上終于給整的差不太多了。
    可以看見命令執行成功。接下來直接寫入一個shell ,使用php 的 file_put_contents 函數寫個一句話木馬:http:. 驗證成功,直接使用蟻劍進行連接。對 192.168.22.x/24 網段進行掃描:.雖然在之前的掃描中發現了Target2 80端口存在一個cms,但是 滲透總是照著軟柿子捏,我們嘗試使用nmap 對Target2 進行端口掃描,尋找其開放的端口。Bage cms 滲透通過查看網頁底部指紋信息,知道這是一個bage cms搭建的網站,而且從時間上來看有些老舊。
    作為一名 Linux 工程師,會寫好的腳本不僅能提高工作效率,還能有更多的時間做自己的事。最近在網上沖浪的時候,也注意收集一些大佬寫過的腳本,匯總整理一下,歡迎收藏,與君共勉!
    Docker網絡原理容器是相對獨立的環境,相當于一個小型的Linux系統,外界無法直接訪問,那他是怎么做的呢,這里我們先了解下Linux veth pair。如下圖所示:veth pair將兩個網絡veth0和veth1連通。lo和eth0在我們的虛擬機啟動的時候就會創建,但是docker0在我們安裝了docker的時候就會創建。
    VSole
    網絡安全專家
      亚洲 欧美 自拍 唯美 另类