podman

Форк
0
138 строк · 4.4 Кб
1
#!/usr/bin/env python
2

3
# This will create golden files in a directory passed to it.
4
# A Test calls this internally to create the golden files
5
# So it can process them (so we don't have to checkin the files).
6

7
# Ensure msgpack-python and cbor are installed first, using:
8
#   sudo apt install python-dev (may not be necessary)
9
#   sudo apt install python-pip # or python3-pip
10
#   pip install --user msgpack-python msgpack-rpc-python cbor
11

12
# Ensure all "string" keys are utf strings (else encoded as bytes)
13

14
from __future__ import print_function
15
import cbor, msgpack, msgpackrpc, sys, os, threading
16

17
mylocaladdr="127.0.0.1" # localhost.localdomain localhost 127.0.0.1
18

19
def get_test_data_list():
20
    # get list with all primitive types, and a combo type
21
    l0 = [ 
22
        -8,
23
         -1616,
24
         -32323232,
25
         -6464646464646464,
26
         192,
27
         1616,
28
         32323232,
29
         6464646464646464,
30
         192,
31
         -3232.0,
32
         -6464646464.0,
33
         3232.0,
34
         6464.0,
35
         6464646464.0,
36
         160.0,
37
         1616.0,
38
         False,
39
         True,
40
         u"null",
41
         None,
42
         u"some&day>some<day",
43
         1328176922000002000,
44
         u"",
45
         -2206187877999998000,
46
         u"bytestring",
47
         270,
48
         u"none",
49
        -2013855847999995777,
50
         #-6795364578871345152,
51
         ]
52
    l1 = [
53
        { "true": True,
54
          "false": False },
55
        { "true": u"True",
56
          "false": False,
57
          "uint16(1616)": 1616 },
58
        { "list": [1616, 32323232, True, -3232.0, {"TRUE":True, "FALSE":False}, [True, False] ],
59
          "int32":32323232, "bool": True, 
60
          "LONG STRING": u"123456789012345678901234567890123456789012345678901234567890",
61
          "SHORT STRING": u"1234567890" },
62
        { True: "true", 138: False, "false": 200 }
63
        ]
64
    
65
    l = []
66
    l.extend(l0)
67
    l.append(l0)
68
    l.append(1)
69
    l.extend(l1)
70
    return l
71

72
def build_test_data(destdir):
73
    l = get_test_data_list()
74
    for i in range(len(l)):
75
        # packer = msgpack.Packer()
76
        serialized = msgpack.dumps(l[i])
77
        with open(os.path.join(destdir, str(i) + '.msgpack.golden'), 'wb') as f:
78
            f.write(serialized)
79
        serialized = cbor.dumps(l[i])
80
        with open(os.path.join(destdir, str(i) + '.cbor.golden'), 'wb') as f:
81
            f.write(serialized)
82

83
def doRpcServer(port, stopTimeSec):
84
    class EchoHandler(object):
85
        def Echo123(self, msg1, msg2, msg3):
86
            return ("1:%s 2:%s 3:%s" % (msg1.decode("utf-8"), msg2.decode("utf-8"), msg3.decode("utf-8")))
87
        def EchoStruct(self, msg):
88
            return ("%s" % msg)
89
    
90
    addr = msgpackrpc.Address(mylocaladdr, port)
91
    server = msgpackrpc.Server(EchoHandler())
92
    server.listen(addr)
93
    # run thread to stop it after stopTimeSec seconds if > 0
94
    if stopTimeSec > 0:
95
        def myStopRpcServer():
96
            server.stop()
97
        t = threading.Timer(stopTimeSec, myStopRpcServer)
98
        t.start()
99
    server.start()
100

101
def doRpcClientToPythonSvc(port):
102
    address = msgpackrpc.Address(mylocaladdr, port)
103
    client = msgpackrpc.Client(address, unpack_encoding='utf-8')
104
    print(client.call("Echo123", "A1", "B2", "C3"))
105
    print(client.call("EchoStruct", {"A" :"Aa", "B":"Bb", "C":"Cc"}))
106

107
# def doCheckSocket(port):
108
#     print(">>>> port: ", port, " <<<<<")
109
#     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
110
#     result = sock.connect_ex(('127.0.0.1', port))
111
#     if result == 0:
112
#         print("\t>>>> Port is open")
113
#     else:
114
#         print("\t>>>> Port is not open")
115
#     sock.close()
116
    
117
def doRpcClientToGoSvc(port):
118
    # doCheckSocket(port)
119
    address = msgpackrpc.Address(mylocaladdr, port)
120
    client = msgpackrpc.Client(address, unpack_encoding='utf-8')
121
    print(client.call("TestRpcInt.Echo123", ["A1", "B2", "C3"]))
122
    print(client.call("TestRpcInt.EchoStruct", {"A" :"Aa", "B":"Bb", "C":"Cc"}))
123

124
def doMain(args):
125
    if len(args) == 2 and args[0] == "testdata":
126
        build_test_data(args[1])
127
    elif len(args) == 3 and args[0] == "rpc-server":
128
        doRpcServer(int(args[1]), int(args[2]))
129
    elif len(args) == 2 and args[0] == "rpc-client-python-service":
130
        doRpcClientToPythonSvc(int(args[1]))
131
    elif len(args) == 2 and args[0] == "rpc-client-go-service":
132
        doRpcClientToGoSvc(int(args[1]))
133
    else:
134
        print("Usage: test.py " + 
135
              "[testdata|rpc-server|rpc-client-python-service|rpc-client-go-service] ...")
136
    
137
if __name__ == "__main__":
138
    doMain(sys.argv[1:])
139

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.