2025年2月

相关内容 node.js实现内网穿透: https://www.jianshu.com/p/d2d4f8bff599 kotlin实现内网穿透: https://www.jianshu.com/p/c8dc095c758e

最大设计连接数: 65535

前面写了个udp转tcp再转udp的工具, 打算用它和tcp内网穿透结合使用 来实现udp内网穿透, 但是在实际使用中发现存在网速较慢的问题, 初步判断为运营商网络问题(使用http下载也一样, 使用单线程只能达到1MB/S内, 3条就可以达到10MB/S. 上传没有问题). 这个问题没法解决就只好再写个udp版. 本来想用udp打洞写的, 但是有一个网络不支持... 只好用服务器转发写, 但是仍然存在一个小问题, 暂时不打算解决. 结尾会讲.

实现代码 服务端: package main

import ( "crypto/rsa" "crypto/sha1" "crypto/x509" "encoding/pem" "errors" "fmt" "log" "net" "os" "strings" "sync" "time" )

var ( privateKeyStr = "-----BEGIN PRIVATE KEY-----\n" + "MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAJYBf37uy0sVXyxb\n" + "bMDjvQzxv/ke3UWCSkYhUd8e+MjGHeT8A9V9aVemg3qUogND/Pgtlz6bTd9p5H+q\n" + "OCXnrZbSwdAN7O/9x3zhRzaEOH+9OJ8vpF80DuatbdqqJXFPiDO+nfbufNyT8n+b\n" + "N9UXISAVv7Nay+vTEySD401czDDzAgMBAAECgYAE9u26TLrrtDxfInN5+s+R8xpQ\n" + "a2YVW9eLdKTaBpNjSbNJldGmqiznWrp1PyARjZl8uT2NM+Si5UVLuF19W6qSC1Tw\n" + "bn2brBSsFsufKQff0XXRsD8OqKT5h5/PlRATYgisZonD/v0SPfDHROcNCFiOelEv\n" + "kKZAJgJS3vghZx5jCQJBAMTb4esPOApK+6wXlfVShvRiac9UWW9KBYLwpqya5Cjd\n" + "X/QUGVwywMFkNgRnBz7yWdJd1zuXfuI77N87BXYOE68CQQDDEjfaKCyZJ73RfNJo\n" + "ED5DRfZXm86RPBDlZznJ4shjNVbk1sGClYAk0WuAHeIZJVpm1HpME6NSCfumqeOq\n" + "z1P9AkAQu+xJcgK+hT89ksexkfFc5ty9vhrYJf+v8MsKUyRgAOl+MxMwzjOqfN1G\n" + "pIduJ2XRRx7btvYXPybUlwzQy0OLAkBIexlznt/LXH/kOcv4TKjF2FYLAWKEhlwE\n" + "0REg2Xn5mtUZnE40lhYSGBoodXIQQ9fOQ37Zi6ZwkjMGHzPvwK+FAkAe9gHRMI2u\n" + "lzwVp/AQntBMXmw92IULIlfRmfV1jDBYuT0JHUUGZqfCrz+iDW8Ot24QBzLbwKxJ\n" + "JRrmXbUxObmC\n" + "-----END PRIVATE KEY-----" natDispatcherAddr = ":8989" natPasswords = []string{"yzh"} timeOut = time.Second * 60 * 2 serverMap = map[string]*Server{} mapMutex sync.Mutex clientIndex = 0 maxLength = 0 )

func main() { log.Println("入参: " + strings.Join(os.Args[1:], " ")) if len(os.Args) == 2 { natDispatcherAddr = os.Args[1] } log.SetFlags(log.LstdFlags | log.Lshortfile) startServer() }

func Start(natDispatcherAddrStr string, natPasswordsArr []string) { natDispatcherAddr = natDispatcherAddrStr natPasswords = natPasswordsArr startServer() }

func startServer() { log.Println("NatU分发服务地址: " + natDispatcherAddr) go func() { for { time.Sleep(time.Second * 60) println() log.Println("natU转发Size:", len(serverMap), ",maxLength:", maxLength) mapMutex.Lock() for _, value := range serverMap { log.Println("natU转发中:", value.toString()) } mapMutex.Unlock() } }() privateKey, err := loadPrivateKey(privateKeyStr) if err != nil { log.Panicln(err) }

listenerAddr, err := net.ResolveUDPAddr("udp", natDispatcherAddr)
if err != nil {
    log.Println(err)
    return
}
network := "udp"
if listenerAddr.IP.To4() != nil {
    network = "udp4"
} else if listenerAddr.IP.To16() != nil {
    network = "udp6"
}
listenerConn, err := net.ListenUDP(network, listenerAddr)
if err != nil {
    log.Println(err)
    return
}
defer listenerConn.Close()
buffer := make([]byte, 1024*64)
for {
    // log.Println("监听消息")
    n, clientAddr, err := listenerConn.ReadFromUDP(buffer)
    if err != nil {
        log.Println(err)
        if errors.Is(err, net.ErrClosed) {
            break
        }
        time.Sleep(time.Second)
        continue
    }
    if n > maxLength {
        maxLength = n
    }
    if n < 1 {
        log.Println("异常消息:", clientAddr)
        continue
    }
    data := make([]byte, n)
    copy(data, buffer)

    cmd := data[0]
    // log.Println("收到消息:", cmd, clientAddr, "=>", listenerConn.LocalAddr())

    switch {
    case cmd == 1:
        // log.Println("心跳数据")
        realData := data[1:]
        // 鉴权
        value := handleNatUAuth(realData, privateKey, clientAddr, listenerConn)
        listenerConn.SetWriteDeadline(time.Now().Add(time.Second * 3))
        listenerConn.WriteToUDP([]byte{value}, clientAddr)
    case cmd == 0:
        // 真实数据
        cIndex := int(data[1])*256 + int(data[2])
        realData := data[3:]
        // log.Println("转发到:", cIndex)
        handleRealData(cIndex, realData)
    }
}

}

func handleRealData(cIndex int, realData []byte) { servers := []*Server{} mapMutex.Lock() for _, server := range serverMap { servers = append(servers, server) } mapMutex.Unlock() for _, server := range servers { server.clientMutex.Lock() for _, value := range server.clientMap { if value.index == cIndex { // log.Println("转成功:", cIndex, value.clientAddr) value.openConn.SetWriteDeadline(time.Now().Add(time.Second * 3)) value.openConn.WriteToUDP(realData, value.clientAddr) value.lastLime = time.Now() break } } server.clientMutex.Unlock() } }

func handleNatUAuth(cmdData []byte, privateKey *rsa.PrivateKey, clientAddr *net.UDPAddr, listenerConn *net.UDPConn) byte { // decryptedText, err := rsa.DecryptPKCS1v15(nil, privateKey, cmdData) decryptedText, err := rsa.DecryptOAEP(sha1.New(), nil, privateKey, cmdData, nil) if err != nil { log.Println("DecryptPKCS1v15 err", err) return 3 } info := string(decryptedText) infos := strings.Split(info, "-") if len(infos) < 2 { log.Println("infos error", infos) return 4 } version := 1 pwdOk := false for _, v := range natPasswords { if v == infos[1] { version = 1 pwdOk = true break } } for _, v := range natPasswords { if v == infos[0] { version = 2 pwdOk = true break } } if !pwdOk { log.Println("密码错误", infos) return 5 } var openAddrs []string switch version { case 1: openAddrs = infos[:1] case 2: openAddrs = infos[1:] default: openAddrs = infos[:1] } // log.Println("openAddrs:", openAddrs)

for index, address := range openAddrs {
    server := getServer(address, index, version, clientAddr, strings.Join(openAddrs, "-"), listenerConn)
    if server == nil {
        log.Println("getServer nil")
        return 6
    }
}
return 1

}

func getServer(address string, index int, version int, clientAddr *net.UDPAddr, openAddrsStr string, listenerConn *net.UDPConn) *Server { addr := strings.Split(address, ":") if len(addr) < 2 { log.Println("address error", address) return nil } port := addr[len(addr)-1] mapMutex.Lock() defer mapMutex.Unlock() server := serverMap[port] if server != nil { if server.address != address { log.Println("端口重复", server.address, address) return nil } if server.openAddrsStr != openAddrsStr { log.Println("openAddrsStr不同", server.openAddrsStr, openAddrsStr) return nil } if server.version != version { log.Println("版本不同", server.version, version) return nil } } else { listenerAddr, err := net.ResolveUDPAddr("udp", address) if err != nil { log.Println(err) return nil } network := "udp" if listenerAddr.IP.To4() != nil { network = "udp4" } else if listenerAddr.IP.To16() != nil { network = "udp6" } openConn, err := net.ListenUDP(network, listenerAddr) if err != nil { log.Println(err) return nil } log.Println("开放地址:", address, "对方index:", index, "版本:", version) server = &Server{address: address, index: index, version: version, createTime: time.Now(), openConn: openConn, openAddrsStr: openAddrsStr, clientMap: map[string]*Client{}} serverMap[port] = server go server.accept(port, listenerConn) } server.clientAddr = clientAddr server.openConn.SetReadDeadline(time.Now().Add(timeOut)) return server }

type Server struct { openAddrsStr string index int version int createTime time.Time address string openConn *net.UDPConn clientAddr *net.UDPAddr clientMap map[string]*Client clientMutex sync.Mutex }

type Client struct { index int lastLime time.Time clientAddr *net.UDPAddr openConn *net.UDPConn }

func (server *Server) accept(port string, listenerConn net.UDPConn) { buffer := make([]byte, 102464) for { n, clientAddr, err := server.openConn.ReadFromUDP(buffer) if err != nil { log.Println(err) break } if n > maxLength { maxLength = n } if n < 1 { log.Println("空消息:", clientAddr, port) continue } data := make([]byte, n) copy(data, buffer) // log.Println("转发消息:", len, clientAddr, "=>", server.index)

    server.clientMutex.Lock()
    client := server.clientMap[clientAddr.String()]
    if client == nil {
    getClient:
        for {
            clientIndex++
            index := clientIndex
            for key, value := range server.clientMap {
                if value.index == index {
                    log.Println("index无效:", index)
                    continue getClient
                }
                if time.Since(value.lastLime) >= timeOut {
                    delete(server.clientMap, key)
                }
            }
            client = &Client{index: index, clientAddr: clientAddr, openConn: server.openConn}
            server.clientMap[clientAddr.String()] = client
            break
        }
    }
    client.lastLime = time.Now()
    server.clientMutex.Unlock()
    listenerConn.SetWriteDeadline(time.Now().Add(time.Second * 3))
    listenerConn.WriteToUDP(append([]byte{byte(10 + server.index), byte(client.index / 256), byte(client.index % 256)}, data...), server.clientAddr)
}
server.openConn.Close()
mapMutex.Lock()
delete(serverMap, port)
mapMutex.Unlock()
log.Println("释放端口:", port)

}

func (server *Server) toString() string { ms := time.Since(server.createTime).Milliseconds() s := ms / 1000 m := s / 60 h := m / 60 runTime := fmt.Sprintf("%d天%d时%d分%d秒", h/24, h%24, m%60, s%60) return fmt.Sprintf("%s=>%s, index: %d, version: %d, %s", server.address, server.clientAddr, server.index, server.version, runTime) }

func loadPrivateKey(privateKeyStr string) (privateKey *rsa.PrivateKey, err error) { block, _ := pem.Decode([]byte(privateKeyStr)) if block == nil { return nil, fmt.Errorf("解码私钥失败") } key, err := x509.ParsePKCS8PrivateKey(block.Bytes) if err != nil { return nil, err } privateKey, ok := key.(*rsa.PrivateKey) if !ok { return nil, fmt.Errorf("非法私钥文件") } return privateKey, nil }

客户端: package main

import ( "crypto/rand" "crypto/rsa" "crypto/sha1" "crypto/x509" "encoding/pem" "errors" "fmt" "log" "net" "os" "strings" "sync" "time" )

var ( publicKeyStr = "-----BEGIN PUBLIC KEY-----\nMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCWAX9+7stLFV8sW2zA470M8b/5\nHt1FgkpGIVHfHvjIxh3k/APVfWlXpoN6lKIDQ/z4LZc+m03faeR/qjgl562W0sHQ\nDezv/cd84Uc2hDh/vTifL6RfNA7mrW3aqiVxT4gzvp327nzck/J/mzfVFyEgFb+z\nWsvr0xMkg+NNXMww8wIDAQAB\n-----END PUBLIC KEY-----\n"

natDispatcherAddr = "127.0.0.1:8989"
natPassword       = "yzh"
rateInterval      = time.Second * 15
timeOut           = time.Second * 60 * 2
natMapArr         = []string{
    ":1701-192.168.3.25:1701",
    ":11771-127.0.0.1:1771",
    ":11772-127.0.0.1:1772",
    ":11773-127.0.0.1:1773",
}
errMap = map[int]string{
    2: "config error or port used",
    3: "DecryptPKCS1v15 err",
    4: "infos error",
    5: "密码错误",
    6: "getServer nil",
    7: "",
    8: "",
    9: "",
}
listenerConn *net.UDPConn
serverAddr   *net.UDPAddr
mapMutex     sync.Mutex
forwardMap   = map[int]*net.UDPConn{}
natSuccess   = false
maxLength    = 0

)

func main() { log.Println("入参: " + strings.Join(os.Args[1:], " ")) if len(os.Args) == 2 { natDispatcherAddr = os.Args[1] } log.SetFlags(log.LstdFlags | log.Lshortfile) startClient() }

func Start(natDispatcherAddrStr string, natPasswordStr string, natMapStr []string) { natDispatcherAddr = natDispatcherAddrStr natPassword = natPasswordStr natMapArr = natMapStr startClient() }

func startClient() { log.Println("NatU分发服务地址: " + natDispatcherAddr) publicKey, err := loadPublicKey(publicKeyStr) if err != nil { log.Panicln(err) } natParams := []string{natPassword} // 通过index找到需要转发的位置 localServerAddrs := []string{} for _, v := range natMapArr { mapArr := strings.Split(v, "-") natServerOpenAddr := mapArr[0] localServerAddr := mapArr[1] log.Println("NatU服务器开放地址: "+natServerOpenAddr, "本地服务地址: "+localServerAddr) natParams = append(natParams, natServerOpenAddr) localServerAddrs = append(localServerAddrs, localServerAddr) } //密码, 开放端口 natInfo, err := rsa.EncryptOAEP(sha1.New(), rand.Reader, publicKey, []byte(strings.Join(natParams, "-")), nil) if err != nil { log.Panicln(err) } go func() { for { log.Println("startNat") creatClient(localServerAddrs) listenerConn = nil serverAddr = nil natSuccess = false time.Sleep(time.Second) } }()

go func() {
    for {
        if listenerConn == nil || serverAddr == nil {
            time.Sleep(time.Second)
            continue
        }
        // log.Println("主动发消息:", listenerConn.LocalAddr(), "=>", serverAddr)
        data := append([]byte{1}, natInfo...)
        listenerConn.SetWriteDeadline(time.Now().Add(time.Second * 5))
        listenerConn.WriteToUDP(data, serverAddr)
        time.Sleep(rateInterval)
    }
}()
for {
    time.Sleep(time.Second * 60)
    println()
    log.Println("natU转发Size:", len(natMapArr), ",maxLength:", maxLength, ",natSuccess:", natSuccess)
    for _, value := range natMapArr {
        log.Println("natU转发中:", strings.ReplaceAll(value, "-", "=>"))
    }
}

}

func creatClient(localServerAddrs []string) { var err error serverAddr, err = net.ResolveUDPAddr("udp", natDispatcherAddr) if err != nil { log.Println(err) return } listenerAddr, err := net.ResolveUDPAddr("udp", ":0") if err != nil { log.Println(err) return } listenerConn, err = net.ListenUDP("udp", listenerAddr) if err != nil { log.Println(err) return } defer listenerConn.Close() buffer := make([]byte, 1024*64) for { listenerConn.SetReadDeadline(time.Now().Add(timeOut)) n, clientAddr, err := listenerConn.ReadFromUDP(buffer) if err != nil { log.Println(err) break } if n > maxLength { maxLength = n } if n < 1 { log.Println("异常消息:", clientAddr) continue } data := make([]byte, n) copy(data, buffer)

    // if clientAddr.Port != serverAddr.Port || clientAddr.IP.String() != serverAddr.IP.String() {
    if clientAddr.Port != serverAddr.Port {
        log.Println("异常消息:", serverAddr, clientAddr)
        continue
    }
    cmd := data[0]
    // log.Println("收到响应:", cmd, clientAddr, "=>", listenerConn.LocalAddr())
    switch {
    case cmd == 1:
        if !natSuccess {
            natSuccess = true
            log.Println("natU建立成功")
        }
        // log.Println("心跳数据")
    case cmd > 1 && cmd < 10:
        errMsg := errMap[int(cmd)]
        if errMsg == "" {
            errMsg = "config error"
        }
        log.Panicln(cmd, errMsg, natMapArr)
    case cmd >= 10:
        forwardAddress := localServerAddrs[data[0]-10]
        clinetIndex := int(data[1])*256 + int(data[2])
        realData := data[3:]
        // log.Println("转发到:", forwardAddress)
        handleClientRequest(clientAddr, realData, forwardAddress, clinetIndex)
    }
}

}

func handleClientRequest(clientAddr net.UDPAddr, clientData []byte, forwardAddress string, clinetIndex int) { if clientAddr == nil { return } clientAddrString := clientAddr.String() mapMutex.Lock() defer mapMutex.Unlock() forwardConn := forwardMap[clinetIndex] if forwardConn == nil { forwardAddr, err := net.ResolveUDPAddr("udp", forwardAddress) if err != nil { log.Println(err) return } forwardConn, err = net.DialUDP("udp", nil, forwardAddr) if err != nil { log.Println(err) return } infoStr := clientAddrString + "=>" + forwardAddress + "=>" + forwardConn.LocalAddr().String() + "=>" + forwardConn.RemoteAddr().String() log.Println("添加udp转发:" + infoStr) forwardMap[clinetIndex] = forwardConn buffer := make([]byte, 102464) go func() { defer forwardConn.Close() forwardSuccess := false for { forwardConn.SetReadDeadline(time.Now().Add(timeOut)) n, serverAddr, err := forwardConn.ReadFromUDP(buffer) if err != nil { log.Println(err) if errors.Is(err, net.ErrClosed) { break } if nerr, ok := err.(net.Error); ok && nerr.Timeout() { break } time.Sleep(time.Second) continue } // if serverAddr.Port != forwardAddr.Port || serverAddr.IP.String() != forwardAddr.IP.String() { if serverAddr.Port != forwardAddr.Port { log.Println("异常消息:", serverAddr.String(), forwardAddr.String()) continue } if n > maxLength { maxLength = n } if !forwardSuccess { forwardSuccess = true log.Println("udp转发成功:", serverAddr.String(), n, clientAddrString) } // log.Println("服务端消息:", serverAddr.String(), len, clientAddrString) data := make([]byte, n) copy(data, buffer) listenerConn.SetWriteDeadline(time.Now().Add(time.Second * 5)) listenerConn.WriteToUDP(append([]byte{0, byte(clinetIndex / 256), byte(clinetIndex % 256)}, data...), clientAddr) } log.Println("移除udp:" + infoStr) mapMutex.Lock() delete(forwardMap, clinetIndex) mapMutex.Unlock() }() } // log.Println("客户端消息:", clientAddrString, len(clientData)) forwardConn.SetWriteDeadline(time.Now().Add(time.Second * 5)) forwardConn.Write(clientData) }

func loadPublicKey(publicKeyStr string) (publicKey *rsa.PublicKey, err error) { block, _ := pem.Decode([]byte(publicKeyStr)) if block == nil { return nil, fmt.Errorf("解码公钥失败") } key, err := x509.ParsePKIXPublicKey(block.Bytes) if err != nil { return nil, err } publicKey, ok := key.(*rsa.PublicKey) if !ok { return nil, fmt.Errorf("非法公钥文件") } return publicKey, nil }

存在的问题: 客户端fd00::2给fd00::5发消息 服务端知道是fd00::2发来的, 但是不知道是哪个ip接收的, 也无法控制使用哪个ip回消息, 测试中发现服务端可能会用fd00::6发消息给fd00::2, 在部分网络下这个消息是发送不过去的(这也是我没用打洞法的原因), 问题点就在这里. 解决方案也很简单, 分别监听每个ip, 但是需要监听设备ip的变化, 不想这样做. 不知道有没有大佬有更好的解决方案

画个草图好理解些:

image.png

作者:今天i你好吗 链接:https://www.jianshu.com/p/c3191a36ee84 来源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


1.简介  当今部署的中间件大多都是在C/S架构上设计的,其中相对隐匿的客户机主动向周知的服务端(拥有静态IP地址和DNS名称)发起链接请求。大多数中间件实现了一种非对称的通讯模型,即内网中的主机可以初始化对外的链接,而外网的主机却不能初始化对内网的链接,除非经过中间件管理员特殊配置。在中间件为常见的NAPT的情况下(也是本文主要讨论的),内网中的客户端没有单独的公网IP地址,而是通过NAPT转换,和其他同一内网用户共享一个公网IP。这种内网主机隐藏在中间件后的不可访问性对于一些客户端

软件如浏览器来说并不是一个问题,因为其只需要初始化对外的链接,从某方面来看反而还对隐私保护有好处。

  然而在P2P应用中,内网主机(客户端)需要对另外的终端(Peer)直接建立链接,但是发起者和响应者可能在不同的中间件后面,两者都没有公网IP地址。而外部对NAT公网IP和端口主动的链接或数据都会因内网未请求被丢弃掉。本文讨论的就是如何跨越NAT实现内网主机直接通讯的问题。

2.术语 防火墙(Firewall):   防火墙主要限制内网和公网的通讯,通常丢弃未经许可的数据包。防火墙会检测(但是不修改)试图进入内网数据包的IP地址和TCP/UDP端口信息。

网络地址转换器(NAT):   NAT不止检查进入数据包的头部,而且对其进行修改,从而实现同一内网中不同主机共用更少的公网IP(通常是一个)。

基本NAT(Basic NAT):   基本NAT会将内网主机的IP地址映射为一个公网IP,不改变其TCP/UDP端口号。基本NAT通常只有在当NAT有公网IP池的时候才有用。

网络地址-端口转换器(NAPT):   到目前为止最常见的即为NAPT,其检测并修改出入数据包的IP地址和端口号,从而允许多个内网主机同时共享一个公网IP地址。

锥形NAT(Cone NAT):   在建立了一对(公网IP,公网端口)和(内网IP,内网端口)二元组的绑定之后,Cone NAT会重用这组绑定用于接下来该应用程序的所有会话(同一内网IP和端口),只要还有一个会话还是激活的。

  例如,假设客户端A建立了两个连续的对外会话,从相同的内部端点(10.0.0.1:1234)到两个不同的外部服务端S1和S2。Cone NAT只为两个会话映射了一个公网端点(155.99.25.11:62000),确保客户端端口的“身份”在地址转换的时候保持不变。由于基本NAT和防火墙都不改变数据包的端口号,因此这些类型的中间件也可以看作是退化的Cone NAT。

对称NAT(Symmetric NAT)   对称NAT正好相反,不在所有公网-内网对的会话中维持一个固定的端口绑定。其为每个新的会话开辟一个新的端口。如下图所示:

其中Cone NAT根据NAT如何接收已经建立的(公网IP,公网端口)对的输入数据还可以细分为以下三类:

  1. 全锥形NAT(Full Cone NAT)   在一个新会话建立了公网/内网端口绑定之后,全锥形NAT接下来会接受对应公网端口的所有数据,无论是来自哪个(公网)终端。全锥NAT有时候也被称为“混杂”NAT(promiscuous NAT)。

  2. 受限锥形NAT(Restricted Cone NAT)   受限锥形NAT只会转发符合某个条件的输入数据包。条件为:外部(源)IP地址匹配内网主机之前发送一个或多个数据包的结点的IP地址。受限NAT通过限制输入数据包为一组“已知的”外部IP地址,有效地精简了防火墙的规则。

  3. 端口受限锥形NAT(Port-Restricted Cone NAT)   端口受限锥形NAT也类似,只当外部数据包的IP地址和端口号都匹配内网主机发送过的地址和端口号时才进行转发。端口受限锥形NAT为内部结点提供了和对称NAT相同等级的保护,以隔离未关联的数据。

  1. P2P通信   根据客户端的不同,客户端之间进行P2P传输的方法也略有不同,这里介绍了现有的穿越中间件进行P2P通信的几种技术。

3.1 中继(Relaying)   这是最可靠但也是最低效的一种P2P通信实现。其原理是通过一个有公网IP的服务器中间人对两个内网客户端的通信数据进行中继和转发。如下图所示:

  客户端A和客户端B不直接通信,而是先都与服务端S建立链接,然后再通过S和对方建立的通路来中继传递的数据。这钟方法的缺陷很明显,当链接的客户端变多之后,会显著增加服务器的负担,完全没体现出P2P的优势。

3.2 逆向链接(Connection reversal)   第二种方法在当两个端点中有一个不存在中间件的时候有效。例如,客户端A在NAT之后而客户端B拥有全局IP地址,如下图:

  客户端A内网地址为10.0.0.1,且应用程序正在使用TCP端口1234。A和服务器S建立了一个链接,服务器的IP地址为18.181.0.31,监听1235端口。NAT A给客户端A分配了TCP端口62000,地址为NAT的公网IP地址155.99.25.11,作为客户端A对外当前会话的临时IP和端口。因此S认为客户端A就是155.99.25.11:62000。而B由于有公网地址,所以对S来说B就是138.76.29.7:1234。

  当客户端B想要发起一个对客户端A的P2P链接时,要么链接A的外网地址155.99.25.11:62000,要么链接A的内网地址10.0.0.1:1234,然而两种方式链接都会失败。链接10.0.0.1:1234失败自不用说,为什么链接155.99.25.11:62000也会失败呢?来自B的TCP SYN握手请求到达NAT A的时候会被拒绝,因为对NAT A来说只有外出的链接才是允许的。

  在直接链接A失败之后,B可以通过S向A中继一个链接请求,从而从A方向“逆向“地建立起A-B之间的点对点链接。

  很多当前的P2P系统都实现了这种技术,但其局限性也是很明显的,只有当其中一方有公网IP时链接才能建立。越来越多的情况下,通信的双方都在NAT之后,因此就要用到我们下面介绍的第三种技术了。

3.3 UDP打洞(UDP hole punching)   第三种P2P通信技术,被广泛采用的,名为“P2P打洞“。P2P打洞技术依赖于通常防火墙和cone NAT允许正当的P2P应用程序在中间件中打洞且与对方建立直接链接的特性。以下主要考虑两种常见的场景,以及应用程序如何设计去完美地处理这些情况。第一种场景代表了大多数情况,即两个需要直接链接的客户端处在两个不同的NAT之后;第二种场景是两个客户端在同一个NAT之后,但客户端自己并不需要知道。

3.3.1. 端点在不同的NAT之下   假设客户端A和客户端B的地址都是内网地址,且在不同的NAT后面。A、B上运行的P2P应用程序和服务器S都使用了UDP端口1234,A和B分别初始化了与Server的UDP通信,地址映射如图所示:

  现在假设客户端A打算与客户端B直接建立一个UDP通信会话。如果A直接给B的公网地址138.76.29.7:31000发送UDP数据,NAT B将很可能会无视进入的数据(除非是Full Cone NAT),因为源地址和端口与S不匹配,而最初只与S建立过会话。B往A直接发信息也类似。

  假设A开始给B的公网地址发送UDP数据的同时,给服务器S发送一个中继请求,要求B开始给A的公网地址发送UDP信息。A往B的输出信息会导致NAT A打开一个A的内网地址与与B的外网地址之间的新通讯会话,B往A亦然。一旦新的UDP会话在两个方向都打开之后,客户端A和客户端B就能直接通讯,而无须再通过引导服务器S了。

  UDP打洞技术有许多有用的性质。一旦一个的P2P链接建立,链接的双方都能反过来作为“引导服务器”来帮助其他中间件后的客户端进行打洞,极大减少了服务器的负载。应用程序不需要知道中间件具体是什么(如果有的话),因为以上的过程在没有中间件或者有多个中间件的情况下也一样能建立通信链路。

3.3.2. 端点在相同的NAT之下   现在考虑这样一种情景,两个客户端A和B正好在同一个NAT之后(而且可能他们自己并不知道),因此在同一个内网网段之内。客户端A和服务器S建立了一个UDP会话,NAT为此分配了公网端口62000,B同样和S建立会话,分配到了端口62001,如下图:

  假设A和B使用了上节介绍的UDP打洞技术来建立P2P通路,那么会发生什么呢?首先A和B会得到由S观测到的对方的公网IP和端口号,然后给对方的地址发送信息。两个客户端只有在NAT允许内网主机对内网其他主机发起UDP会话的时候才能正常通信,我们把这种情况称之为"回环传输“(lookback translation),因为从内部到达NAT的数据会被“回送”到内网中而不是转发到外网。例如,当A发送一个UDP数据包给B的公网地址时,数据包最初有源IP地址和端口地址10.0.0.1:1234和目的地址155.99.25.11:62001,NAT收到包后,将其转换为源155.99.25.11:62000(A的公网地址)和目的10.1.1.3:1234,然后再转发给B。即便NAT支持回环传输,这种转换和转发在此情况下也是没必要的,且有可能会增加A与B的对话延时和加重NAT的负担。

  对于这个问题,解决方案是很直观的。当A和B最初通过S交换地址信息时,他们应该包含自身的IP地址和端口号(从自己看),同时也包含从服务器看的自己的地址和端口号。然后客户端同时开始从对方已知的两个的地址中同时开始互相发送数据,并使用第一个成功通信的地址作为对方地址。如果两个客户端在同一个NAT后,发送到对方内网地址的数据最有可能先到达,从而可以建立一条不经过NAT的通信链路;如果两个客户端在不同的NAT之后,发送给对方内网地址的数据包根本就到达不了对方,但仍然可以通过公网地址来建立通路。值得一提的是,虽然这些数据包通过某种方式验证,但是在不同NAT的情况下完全有可能会导致A往B发送的信息发送到其他A内网网段中无关的结点上去的。

3.3.3. 固定端口绑定   UDP打洞技术有一个主要的条件:只有当两个NAT都是Cone NAT(或者非NAT的防火墙)时才能工作。因为其维持了一个给定的(内网IP,内网UDP)二元组和(公网IP, 公网UDP)二元组固定的端口绑定,只要该UDP端口还在使用中,就不会变化。如果像对称NAT一样,给每个新会话分配一个新的公网端口,就会导致UDP应用程序无法使用跟外部端点已经打通了的通信链路。由于Cone NAT是当今最广泛使用的,尽管有一小部分的对称NAT是不支持打洞的,UDP打洞技术也还是被广泛采纳应用。

  1. 具体实现   如果理解了上面所说的内容,那么代码实现起来倒很简单了 。这里采用C++的异步IO库来实现引导服务器和P2P客户端的简单功能,目的是打通两个客户端的通信链路,使两个不同局域网之间的客户端可以实现直接通信。

4.1 引导服务端设计   引导服务器运行在一个有公网地址的设备上,并且接收指定端口的来自客户的命令(这里是用端口号2333)。

客户端其实可以而且也最好应该与服务器建立TCP链接,但我这里为了图方便,也只采用了UDP的通信方式。服务端监听2333端口的命令,然后执行相应的操作,目前包含的命令有:

login, 客户端登录,使得其记录在服务器traker中,让其他peer可以对其发出链接请求。

logout,客户端登出,使其对peer隐藏。因为服务器不会追踪客户端的登录状态。

list,客户端查看目前的登录用户。

punch , 对指定用户(序号)进行打洞。

help, 查看有哪些可用的命令。

4.2 P2P客户端设计   一般的网络编程,都是客户端比服务端要难,因为要处理与服务器的通信同时还要处理来自用户的事件;对于P2P客户端来说更是如此,因为P2P客户端不止作为客户端,同时也作为对等连接的服务器端。

这里的大体思路是,输入命令传输给服务器之后,接收来自服务器的反馈,并执行相应代码。例如A想要与B建立通信链路,先给服务器发送punch命令以及给B发送数据,服务器接到命令后给B发送punch_requst信息以及A的端点信息,B收到之后向A发送数据打通通路,然后A与B就可以进行P2P通信了。经测试,打通通路后即便把服务器关闭,A与B也能正常通信。

具体的代码见https://github.com/pannzh/P2P-Over-MiddleBoxes-Demo/tree/boost_asio, 另外还实现了posix c版本的https://github.com/pannzh/P2P-Over-MiddleBoxes-Demo/tree/master

以及python版本的https://github.com/pannzh/P2P-Over-MiddleBoxes-Demo/tree/python


我们经常会遇到一个问题,如何将本机的服务暴露到公网上,让别人也可以访问。我们知道,在家上网的时候我们有一个 IP 地址,但是这个 IP 地址并不是一个公网的 IP 地址,别人无法通过一个 IP 地址访问到你的服务,所以在例如:微信接口调试、三方对接的时候,你必须将你的服务部署到一个公网的系统中去,这样太累了。 这个时候,内网穿透就出现了,它的作用就是即使你在家的服务,也能被其人访问到。 今天让我们来用一个最简单的案例学习一下如何用 go 来做一个最简单的内网穿透工具。

整体结构 首先我们用几张图来说明一下我们是如何实现的,说清楚之后再来用代码实现一下。

当前网络情况

我们可以看到,画实线的是我们当前可以访问的,画虚线的是我们当前无法进行直接访问的。

我们现在有的路是:

用户主动访问公网服务器是可以的 内网主动访问公网服务也是可以的 当前我们要做的是想办法能让用户访问到内网服务,所以如果能做到公网服务访问到内网服务,那么用户就能间接访问到内网服务了。

想是这么想的,但是实际怎么做呢?用户访问不到内网服务,那我公网服务器同样访问不到吧。所以我们就需要利用现有的链路来完成这件事。

基本架构

内网,客户端(我们要搞一个) 外网,服务端(我们也要搞一个) 访问者,用户 首先我们需要一个控制通道来传递消息,因为只有内网可以访问公网,公网不知道内网在哪里,所以第一次肯定需要客户端主动告诉服务端我在哪 服务端通过 8007 端口监听用户来的请求 当用户发来请求时,服务端需要通过控制信道告诉客户端,有用户来了 客户端收到消息之后建立隧道通道,主动访问服务端的 8008 来建立 TCP 连接 此时客户端需要同时与本地需要暴露的服务 127.0.0.1:8080 建立连接 连接完成后,服务端需要将 8007 的请求转发到隧道端口 8008 中 客户端从隧道中获得用户请求,转发给内网服务,同时将内网服务的返回信息放入隧道 最终请求流向是,如图中的紫色箭头走向,请求返回是如图中红色箭头走向。

需要理解的是,TCP 一旦建立了连接,双方就都可以向对方发送信息了,所以其实原理很简单,就是利用已有的单向路建立 TCP 连接,从而知道对方的位置信息,然后将请求进行转发即可。

代码实现 工具方法 首先我们先定义三个需要使用的工具方法,还需要定义两个消息编码常量,后面会用到

监听一个地址对应的 TCP 请求 CreateTCPListener 连接一个 TCP 地址 CreateTCPConn 将一个 TCP-A 连接的数据写入另一个 TCP-B 连接,将 TCP-B 连接返回的数据写入 TCP-A 的连接中 Join2Conn (别看这短短 10 几行代码,这就是核心了) 代码语言:javascript 复制 package network

import ( "io" "log" "net" )

const ( KeepAlive = "KEEP_ALIVE" NewConnection = "NEW_CONNECTION" )

func CreateTCPListener(addr string) (*net.TCPListener, error) { tcpAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return nil, err } tcpListener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { return nil, err } return tcpListener, nil }

func CreateTCPConn(addr string) (*net.TCPConn, error) { tcpAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return nil, err } tcpListener, err := net.DialTCP("tcp",nil, tcpAddr) if err != nil { return nil, err } return tcpListener, nil }

func Join2Conn(local *net.TCPConn, remote *net.TCPConn) { go joinConn(local, remote) go joinConn(remote, local) }

func joinConn(local *net.TCPConn, remote *net.TCPConn) { defer local.Close() defer remote.Close() _, err := io.Copy(local, remote) if err != nil { log.Println("copy failed ", err.Error()) return } } 客户端 我们先来实现相对简单的客户端,客户端主要做的事情是 3 件:

连接服务端的控制通道 等待服务端从控制通道中发来建立连接的消息 收到建立连接的消息时,将本地服务和远端隧道建立连接(这里就要用到我们的工具方法了) 代码语言:javascript 复制 package main

import ( "bufio" "io" "log" "net"

"nat-proxy/cmd/network" )

var ( // 本地需要暴露的服务端口 localServerAddr = "127.0.0.1:32768"

remoteIP = "111.111.111.111" // 远端的服务控制通道,用来传递控制信息,如出现新连接和心跳 remoteControlAddr = remoteIP + ":8009" // 远端服务端口,用来建立隧道 remoteServerAddr = remoteIP + ":8008" )

func main() { tcpConn, err := network.CreateTCPConn(remoteControlAddr) if err != nil { log.Println("[连接失败]" + remoteControlAddr + err.Error()) return } log.Println("[已连接]" + remoteControlAddr)

reader := bufio.NewReader(tcpConn) for { s, err := reader.ReadString('\n') if err != nil || err == io.EOF { break }

  // 当有新连接信号出现时,新建一个tcp连接
  if s == network.NewConnection+"\n" {
     go connectLocalAndRemote()
  }

}

log.Println("[已断开]" + remoteControlAddr) }

func connectLocalAndRemote() { local := connectLocal() remote := connectRemote()

if local != nil && remote != nil { network.Join2Conn(local, remote) } else { if local != nil { _ = local.Close() } if remote != nil { _ = remote.Close() } } }

func connectLocal() *net.TCPConn { conn, err := network.CreateTCPConn(localServerAddr) if err != nil { log.Println("[连接本地服务失败]" + err.Error()) } return conn }

func connectRemote() *net.TCPConn { conn, err := network.CreateTCPConn(remoteServerAddr) if err != nil { log.Println("[连接远端服务失败]" + err.Error()) } return conn } 服务端 服务端的实现就相对复杂一些了:

监听控制通道,接收客户端的连接请求 监听访问端口,接收来自用户的 http 请求 第二步接收到请求之后需要存放一下这个连接并同时发消息给客户端,告诉客户端有用户访问了,赶紧建立隧道进行通信 监听隧道通道,接收来自客户端的连接请求,将客户端的连接与用户的连接建立起来(也是用工具方法) 代码语言:javascript 复制 package main

import ( "log" "net" "strconv" "sync" "time"

"nat-proxy/cmd/network" )

const ( controlAddr = "0.0.0.0:8009" tunnelAddr = "0.0.0.0:8008" visitAddr = "0.0.0.0:8007" )

var ( clientConn *net.TCPConn connectionPool map[string]*ConnMatch connectionPoolLock sync.Mutex )

type ConnMatch struct { addTime time.Time accept *net.TCPConn }

func main() { connectionPool = make(map[string]*ConnMatch, 32) go createControlChannel() go acceptUserRequest() go acceptClientRequest() cleanConnectionPool() }

// 创建一个控制通道,用于传递控制消息,如:心跳,创建新连接 func createControlChannel() { tcpListener, err := network.CreateTCPListener(controlAddr) if err != nil { panic(err) }

log.Println("[已监听]" + controlAddr) for { tcpConn, err := tcpListener.AcceptTCP() if err != nil { log.Println(err) continue }

  log.Println("[新连接]" + tcpConn.RemoteAddr().String())
  // 如果当前已经有一个客户端存在,则丢弃这个链接
  if clientConn != nil {
     _ = tcpConn.Close()
  } else {
     clientConn = tcpConn
     go keepAlive()
  }

} }

// 和客户端保持一个心跳链接 func keepAlive() { go func() { for { if clientConn == nil { return } _, err := clientConn.Write(([]byte)(network.KeepAlive + "\n")) if err != nil { log.Println("[已断开客户端连接]", clientConn.RemoteAddr()) clientConn = nil return } time.Sleep(time.Second * 3) } }() }

// 监听来自用户的请求 func acceptUserRequest() { tcpListener, err := network.CreateTCPListener(visitAddr) if err != nil { panic(err) } defer tcpListener.Close() for { tcpConn, err := tcpListener.AcceptTCP() if err != nil { continue } addConn2Pool(tcpConn) sendMessage(network.NewConnection + "\n") } }

// 将用户来的连接放入连接池中 func addConn2Pool(accept *net.TCPConn) { connectionPoolLock.Lock() defer connectionPoolLock.Unlock()

now := time.Now() connectionPool[strconv.FormatInt(now.UnixNano(), 10)] = &ConnMatch{now, accept,} }

// 发送给客户端新消息 func sendMessage(message string) { if clientConn == nil { log.Println("[无已连接的客户端]") return } _, err := clientConn.Write([]byte(message)) if err != nil { log.Println("[发送消息异常]: message: ", message) } }

// 接收客户端来的请求并建立隧道 func acceptClientRequest() { tcpListener, err := network.CreateTCPListener(tunnelAddr) if err != nil { panic(err) } defer tcpListener.Close()

for { tcpConn, err := tcpListener.AcceptTCP() if err != nil { continue } go establishTunnel(tcpConn) } }

func establishTunnel(tunnel *net.TCPConn) { connectionPoolLock.Lock() defer connectionPoolLock.Unlock()

for key, connMatch := range connectionPool { if connMatch.accept != nil { go network.Join2Conn(connMatch.accept, tunnel) delete(connectionPool, key) return } }

_ = tunnel.Close() }

func cleanConnectionPool() { for { connectionPoolLock.Lock() for key, connMatch := range connectionPool { if time.Now().Sub(connMatch.addTime) > time.Second*10 { _ = connMatch.accept.Close() delete(connectionPool, key) } } connectionPoolLock.Unlock() time.Sleep(5 * time.Second) } } 其他 其中我加入了 keepalive 的消息,用于保持客户端与服务端的一直正常连接 我们还需要定期清理一下服务端 map 中没有建立成功的连接 实验一下 首先在本机用 dokcer 部署一个 nginx 服务(你可以启动一个 tomcat 都可以的),并修改客户监听端口localServerAddr为127.0.0.1:32768,并修改remoteIP 为服务端 IP 地址。然后访问以下,看到是可以正常访问的。

然后编译打包服务端扔到服务器上启动、客户端本地启动,如果控制台输出连接成功,就完成准备了

现在通过访问服务端的 8007 端口就可以访问我们内网的服务了。

遗留问题 上述的实现是一个最小的实现,也只是为了完成基本功能,还有一些遗留的问题等待你的处理:

现在一个客户端连接上了就不能连接第二个了,那怎么做多个客户端的连接呢? 当前这个 map 的使用其实是有风险的,如何做好连接池的管理? TCP 连接的开销是很大的,如何做好连接的复用? 当前是 TCP 的连接,那么如果是 UDP 如何实现呢? 当前连接都是不加密的,如何进行加密呢? 当前的 keepalive 实现很简单,有没有更优雅的实现方式呢? 这些就交给聪明的你来完成了

总结 其实最后回头看看实现起来并不复杂,用 go 来实现已经是非常简单了,所以 github 上面有很多利用 go 来实现代理或者穿透的工具,我也是参考它们抽离了其中的核心,最重要的就是工具方法中的第三个 copy 了,不过其实还有很多细节点需要考虑的。你可以参考下面的源码继续深入探索一下。

https://github.com/fatedier/frp

https://github.com/snail007/goproxy


首先还是我们需要一个http服务器,这个http服务器是我们的内网的服务器,也就是说我们需要在外网访问到这个位于内网的http服务器。假设我们内网的ip是127.0.0.1,分配的局域网ip是192.168.1.10,然后http端口是8080

那么显而易见,我们在同一内网环境是可以访问的,直接使用192.168.1.10:8000即可访问到服务器

但是如果不在同一局域网的机器就不行了,需要借助一台公网ip的服务器来做一个透传代理。

内网服务器准备 这里假设你已经安装python2或者python3,打开我们的mac终端或者windows cmd 在python2下输入python -m SimpleHTTPServer

python3下输入python -m http.server

这样我们可以快速得到一台http服务器,打开浏览器输入127.0.0.1:8000可以发现是一个文件浏览的http服务器

我们不需要很复杂的http服务器,仅仅用来做测试而已,所以这样是足够的了

服务端代码 控制客户端的监听代码 1.这里选择监听在8009端口,这个tcp服务,主要用来接受客户端的连接请求的,然后发送控制指令给到客户端,请求建立隧道连接的。这里只接受一个客户端的连接请求,如果有多余的会close掉

一旦有客户端连接到8009端口,这个tcp连接是一直保持的,为什么呢?

因为服务端需要发送控制指令给客户端,所以tcp连接必须一直保持。

然后服务端会每隔两秒发送hi这个消息给到客户端,客户端可以直接忽略掉,因为这个hi只是类似心跳机制的保证。

var cache *net.TCPConn = nil func makeControl() { var tcpAddr *net.TCPAddr tcpAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:8009") //打开一个tcp断点监听 tcpListener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { panic(err) } fmt.Println("控制端口已经监听") for { tcpConn, err := tcpListener.AcceptTCP() if err != nil { panic(err) } fmt.Println("新的客户端连接到控制端服务进程:" + tcpConn.RemoteAddr().String()) if cache != nil { fmt.Println("已经存在一个客户端连接!") //直接关闭掉多余的客户端请求 tcpConn.Close() } else { cache = tcpConn } go control(tcpConn) }

func control(conn *net.TCPConn) { go func() { for { //一旦有客户端连接到服务端的话,服务端每隔2秒发送hi消息给到客户端 //如果发送不出去,则认为链路断了,清除cache连接 _, e := conn.Write(([]byte)("hi\n")) if e != nil { cache = nil } time.Sleep(time.Second * 2) } }() } 对外访问的服务端口监听 假设端口是8007,这里的对外访问的服务端口监听,也就是说假设我们服务器ip是10.18.10.1的话,那么访问10.18.10.1的端口8007,就等于请求内网的127.0.0.1:8000 127.0.0.1:8000就是上面的python服务器

和上面的代码看起来很像,但是用处不一样,上面那个主要目的是控制客户端,要求它建立请求

这里的目的主要是提供真正需要tcp代理透传的服务!

func makeAccept() { var tcpAddr *net.TCPAddr tcpAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:8007") tcpListener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { panic(err) } defer tcpListener.Close() for { tcpConn, err := tcpListener.AcceptTCP() if err != nil { fmt.Println(err) continue } fmt.Println("A client connected 8007:" + tcpConn.RemoteAddr().String()) addConnMathAccept(tcpConn) sendMessage("new\n") } } 这里大家思考一下,如果真的有请求来了,也就是访问8007了,我们怎么办呢?显然我们需要把进来的流量发给127.0.0.1:8000,让它去处理就行了。

这么一想好像很简单的样子,但是好像有问题,那就是我的10.18.10.1是公网ip啊,大家都知道,只有非公网可以主动访问公网,非公网主动访问公网的意思就是好像我们日常访问百度一样。公网是不可以直接跟非公网建立tcp连接的。

那么怎么解决呢?

那就是我们需要先记录下这个进来的8007的tcp连接,然后上面不是说到我们有个tcp连接是一直hold住的,那就是8009那个,服务器每隔2秒发送hi给客户端的。

那么我们可以通过这个8009的tcp链路发送一条消息给客户端,告诉客户端赶紧和我建立一个新的tcp请求吧,为了方便描述,我把"告诉客户端赶紧和我建立一个新的tcp请求"这个新的请求标记为8008链路

这时候就可以把8007的tcp流量发送到这个新建立的tcp链路上。然后把这个新建立的tcp链路的请求发送回去,建立一个读写传输链路即可。

注意这里不能使用8009的tcp链路,8009只是我们用来沟通的链路。

理清楚后,开始编码吧

记录进来的8007的tcp连接,使用一个结构体来存储,这个结构体需要记录accept的tcp连接,也就是8007的tcp链路,和请求的时间,以及8008的链路

刚开始记录的时候8008的链路肯定是nil的,所以设置为nil即可

把它添加到map里面。使用unixNano作为临时key

type ConnMatch struct { accept *net.TCPConn //8007 tcp链路 accept acceptAddTime int64 //接受请求的时间 tunnel *net.TCPConn //8008 tcp链路 tunnel } var connListMap = make(map[string]*ConnMatch) var lock = sync.Mutex{} func addConnMathAccept(accept *net.TCPConn) { //加锁防止竞争读写map lock.Lock() defer lock.Unlock() now := time.Now().UnixNano() connListMap[strconv.FormatInt(now, 10)] = &ConnMatch{accept, time.Now().Unix(), nil} } 告诉客户端赶紧和我建立一个新的tcp请求

这里直接用上面那个cache的tcp链路发送消息即可,不需要太复杂,这里简单定义为new\n即可

........ addConnMathAccept(tcpConn) sendMessage("new\n") } } func sendMessage(message string) { fmt.Println("send Message " + message) if cache != nil { _, e := cache.Write([]byte(message)) if e != nil { fmt.Println("消息发送异常") fmt.Println(e.Error()) } } else { fmt.Println("没有客户端连接,无法发送消息") } } 转发的tcp监听服务 这里我们来创建前面提到的8008tcp连接了,这里的8008端口,也就是前面说的发送new这个消息告诉客户端来和这个8008连接吧

func makeForward() { var tcpAddr *net.TCPAddr tcpAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:8008") tcpListener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { panic(err) } defer tcpListener.Close() fmt.Println("Server ready to read ...") for { tcpConn, err := tcpListener.AcceptTCP() if err != nil { fmt.Println(err) continue } fmt.Println("A client connected 8008 :" + tcpConn.RemoteAddr().String()) configConnListTunnel(tcpConn) } } 然后把8008链路分配到ConnMatch,这两个tcp链路是配对的

var connListMapUpdate = make(chan int) func configConnListTunnel(tunnel *net.TCPConn) { //加锁解决竞争问题 lock.Lock() used := false for _, connMatch := range connListMap { //找到tunnel为nil的而且accept不为nil的connMatch if connMatch.tunnel == nil && connMatch.accept != nil { //填充tunnel链路 connMatch.tunnel = tunnel used = true //这里要break,是防止这条链路被赋值到多个connMatch! break } } if !used { //如果没有被使用的话,则说明所有的connMatch都已经配对好了,直接关闭多余的8008链路 fmt.Println(len(connListMap)) _ = tunnel.Close() fmt.Println("关闭多余的tunnel") } lock.Unlock() //使用channel机制来告诉另一个方法已经就绪 connListMapUpdate <- UPDATE } tcp 转发,这里读取connListMapUpdate这个chain,说明map有更新,需要建立tcpForward隧道

func tcpForward() { for { select { case <-connListMapUpdate: lock.Lock() for key, connMatch := range connListMap { //如果两个都不为空的话,建立隧道连接 if connMatch.tunnel != nil && connMatch.accept != nil { fmt.Println("建立tcpForward隧道连接") go joinConn2(connMatch.accept, connMatch.tunnel) //从map中删除 delete(connListMap, key) } } lock.Unlock() } } } func joinConn2(conn1 *net.TCPConn, conn2 *net.TCPConn) { f := func(local *net.TCPConn, remote *net.TCPConn) { //defer保证close defer local.Close() defer remote.Close() //使用io.Copy传输两个tcp连接, _, err := io.Copy(local, remote) if err != nil { fmt.Println(err.Error()) return } fmt.Println("join Conn2 end") } go f(conn2, conn1) go f(conn1, conn2) } 最后增加一个超时机制,因为会存在这种情况,就是当用户请求8007端口的时候,迟迟等不到配对的connMatch的tunnel链路啊,因为有可能client端挂掉了,导致server不管怎么发送"new"请求,client都无动于衷。

在浏览器看来表现就是一直转着,但是我们不能这样子。

所以当超时的时候,我们主动断掉connMatch中的accept链路即可,设置为5秒

func releaseConnMatch() { for { lock.Lock() for key, connMatch := range connListMap { //如果在指定时间内没有tunnel的话,则释放该连接 if connMatch.tunnel == nil && connMatch.accept != nil { if time.Now().Unix()-connMatch.acceptAddTime > 5 { fmt.Println("释放超时连接") err := connMatch.accept.Close() if err != nil { fmt.Println("释放连接的时候出错了:" + err.Error()) } delete(connListMap, key) } } } lock.Unlock() time.Sleep(5 * time.Second) } } 最后把所有方法整合起来

func main() { //监听控制端口8009 go makeControl() //监听服务端口8007 go makeAccept() //监听转发端口8008 go makeForward() //定时释放连接 go releaseConnMatch() //执行tcp转发 tcpForward() } 客户端代码 连接到服务器的8009控制端口,随时接受服务器的控制请求,随时待命

func connectControl() { var tcpAddr *net.TCPAddr //这里在一台机测试,所以没有连接到公网,可以修改到公网ip tcpAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:8009") conn, err := net.DialTCP("tcp", nil, tcpAddr) if err != nil { fmt.Println("Client connect error ! " + err.Error()) return } fmt.Println(conn.LocalAddr().String() + " : Client connected!8009") reader := bufio.NewReader(conn) for { s, err := reader.ReadString('\n') if err != nil || err == io.EOF { break } else { //接收到new的指令的时候,新建一个tcp连接 if s == "new\n" { go combine() } if s == "hi" { //忽略掉hi的请求 } }

}

} combine方法的代码,整合local和remote的tcp连接

func combine() { local := connectLocal() remote := connectRemote() if local != nil && remote != nil { joinConn(local, remote) } else { if local != nil { err := local.Close() if err!=nil{ fmt.Println("close local:" + err.Error()) } } if remote != nil { err := remote.Close() if err!=nil{ fmt.Println("close remote:" + err.Error()) }

    }
}

} func joinConn(local *net.TCPConn, remote *net.TCPConn) { f := func(local *net.TCPConn, remote *net.TCPConn) { defer local.Close() defer remote.Close() _, err := io.Copy(local, remote) if err != nil { fmt.Println(err.Error()) return } fmt.Println("end") } go f(local, remote) go f(remote, local) } connectLocal 连接到python的8000端口!

func connectLocal() *net.TCPConn { var tcpAddr *net.TCPAddr tcpAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:8000")

conn, err := net.DialTCP("tcp", nil, tcpAddr)

if err != nil {
    fmt.Println("Client connect error ! " + err.Error())
    return nil
}

fmt.Println(conn.LocalAddr().String() + " : Client connected!8000")
return conn

} connectRemote 连接到服务端的8008端口!

func connectRemote() *net.TCPConn { var tcpAddr *net.TCPAddr tcpAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:8008")

conn, err := net.DialTCP("tcp", nil, tcpAddr)

if err != nil {
    fmt.Println("Client connect error ! " + err.Error())
    return nil
}
fmt.Println(conn.LocalAddr().String() + " : Client connected!8008")
return conn;

} 全部整合起来就是

func main() { connectControl() }


物联网(IoT)技术的发展,离不开多种通信协议的支持。这些协议在数据传输、设备连接和管理等方面起到了至关重要的作用。本文将介绍和对比8种常见的物联网协议,帮助理解它们的特点及适用场景。

  1. MQTT(Message Queuing Telemetry Transport) 概述 MQTT是一种轻量级的消息传输协议,设计用于低带宽和不稳定网络环境下的设备通信。

特点 轻量级:协议头部非常小,适合资源受限的设备。 发布/订阅模型:支持松耦合的消息通信方式。 QoS(服务质量) :提供三种服务质量等级(0, 1, 2)。 应用场景 适用于物联网设备的远程监控、传感器数据传输等。

  1. CoAP(Constrained Application Protocol) 概述 CoAP是一种专为低功耗、低带宽网络设计的协议,基于REST架构,类似于HTTP。

特点 轻量级:适合资源受限设备。 RESTful架构:支持GET、POST、PUT、DELETE操作。 UDP传输:减少传输开销。 应用场景 适用于智能家居、工业控制等需要简单请求/响应机制的场景。

  1. AMQP(Advanced Message Queuing Protocol) 概述 AMQP是一种开源的标准应用层协议,用于消息中间件,支持复杂的消息传递模式。

特点 可靠性:确保消息的可靠传递。 灵活性:支持多种消息传递模式。 互操作性:不同厂商的实现可以互操作。 应用场景 适用于金融服务、企业消息系统等需要高可靠性和灵活性的场景。

  1. XMPP(Extensible Messaging and Presence Protocol) 概述 XMPP是一种基于XML的协议,最初用于即时消息传递,现也用于物联网设备的通信。

特点 实时性:支持即时通信。 扩展性:通过XMPP扩展协议(XEPs)扩展功能。 安全性:支持TLS加密。 应用场景 适用于聊天应用、实时通知以及物联网设备之间的实时通信。

  1. DDS(Data Distribution Service) 概述 DDS是一种面向实时系统的数据传输协议,支持发布/订阅模式,适合高性能、低延迟的应用。

特点 实时性:高效的数据分发和低延迟。 可扩展性:支持大规模分布式系统。 QoS策略:多种服务质量策略。 应用场景 适用于自动驾驶、航空航天等需要高实时性和高可靠性的场景。

  1. Zigbee 概述 Zigbee是一种低功耗、低数据速率的无线通信协议,主要用于家庭自动化和工业控制。

特点 低功耗:适合电池供电设备。 自组网:支持设备自动加入网络。 安全性:支持AES加密。 应用场景 适用于智能家居、工业自动化等需要低功耗无线通信的场景。

  1. Z-Wave 概述 Z-Wave是一种用于家庭自动化的无线通信协议,具有低功耗、可靠性高的特点。

特点 低功耗:适合长时间运行的设备。 中继功能:设备可以作为中继器,增强信号覆盖。 广泛支持:得到多家厂商支持,设备兼容性好。 应用场景 适用于家庭自动化系统,如智能灯光控制、安全监控等。

  1. Bluetooth Low Energy(BLE) 概述 BLE是一种用于短距离数据通信的低功耗蓝牙技术,适合电池供电的设备。

特点 低功耗:延长电池寿命。 广泛支持:内置于大多数智能手机和移动设备。 适应性强:支持广播和连接模式。 应用场景 适用于可穿戴设备、医疗设备、智能家居等需要短距离低功耗通信的场景。

分析说明表 协议 特点 应用场景 MQTT 轻量级、发布/订阅模型、QoS支持 远程监控、传感器数据传输 CoAP 轻量级、RESTful架构、UDP传输 智能家居、工业控制 AMQP 高可靠性、灵活性、互操作性 金融服务、企业消息系统 XMPP 实时性、扩展性、安全性 聊天应用、实时通知、设备通信 DDS 高实时性、可扩展性、QoS策略 自动驾驶、航空航天 Zigbee 低功耗、自组网、安全性 智能家居、工业自动化 Z-Wave 低功耗、中继功能、广泛支持 家庭自动化系统 BLE 低功耗、广泛支持、适应性强 可穿戴设备、医疗设备、智能家居 总结 物联网协议的选择需要考虑多方面的因素,包括设备资源、网络环境、数据传输要求和应用场景。本文介绍的8种物联网协议各有其优势和适用领域:

MQTT和CoAP适合资源受限的设备和网络环境。 AMQP和DDS提供高可靠性和高性能的数据传输,适用于对实时性和可靠性要求较高的场景。 XMPP适用于需要实时通信的应用,如聊天和通知。 Zigbee和Z-Wave专注于低功耗和家庭自动化应用。 BLE适合短距离、低功耗的通信需求,如可穿戴设备和智能家居。 根据具体的应用需求,选择合适的协议可以大幅提升系统的性能和可靠性。希望本文能为您在物联网协议的选择和应用中提供有价值的参考。