Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_framework/chronik/client.py
Show All 20 Lines | class ChronikResponse: | ||||
def __init__(self, status: int, *, ok_proto=None, error_proto=None) -> None: | def __init__(self, status: int, *, ok_proto=None, error_proto=None) -> None: | ||||
self.status = status | self.status = status | ||||
self.ok_proto = ok_proto | self.ok_proto = ok_proto | ||||
self.error_proto = error_proto | self.error_proto = error_proto | ||||
def ok(self): | def ok(self): | ||||
if self.status != 200: | if self.status != 200: | ||||
raise AssertionError( | raise AssertionError( | ||||
f'Expected OK response, but got status {self.status}, error: ' | f"Expected OK response, but got status {self.status}, error: " | ||||
f'{self.error_proto}') | f"{self.error_proto}" | ||||
) | |||||
return self.ok_proto | return self.ok_proto | ||||
def err(self, status: int): | def err(self, status: int): | ||||
if self.status == 200: | if self.status == 200: | ||||
raise AssertionError( | raise AssertionError( | ||||
f'Expected error response status {status}, but got OK: {self.ok_proto}') | f"Expected error response status {status}, but got OK: {self.ok_proto}" | ||||
) | |||||
if self.status != status: | if self.status != status: | ||||
raise AssertionError( | raise AssertionError( | ||||
f'Expected error response status {status}, but got different error ' | f"Expected error response status {status}, but got different error " | ||||
f'status {self.status}, error: {self.error_proto}') | f"status {self.status}, error: {self.error_proto}" | ||||
) | |||||
return self.error_proto | return self.error_proto | ||||
class ChronikScriptClient: | class ChronikScriptClient: | ||||
def __init__(self, client: 'ChronikClient', script_type: str, | def __init__( | ||||
script_payload: str) -> None: | self, client: "ChronikClient", script_type: str, script_payload: str | ||||
) -> None: | |||||
self.client = client | self.client = client | ||||
self.script_type = script_type | self.script_type = script_type | ||||
self.script_payload = script_payload | self.script_payload = script_payload | ||||
def confirmed_txs(self, page=None, page_size=None): | def confirmed_txs(self, page=None, page_size=None): | ||||
query = _page_query_params(page, page_size) | query = _page_query_params(page, page_size) | ||||
return self.client._request_get( | return self.client._request_get( | ||||
f'/script/{self.script_type}/{self.script_payload}/confirmed-txs{query}', | f"/script/{self.script_type}/{self.script_payload}/confirmed-txs{query}", | ||||
pb.TxHistoryPage) | pb.TxHistoryPage, | ||||
) | |||||
def history(self, page=None, page_size=None): | def history(self, page=None, page_size=None): | ||||
query = _page_query_params(page, page_size) | query = _page_query_params(page, page_size) | ||||
return self.client._request_get( | return self.client._request_get( | ||||
f'/script/{self.script_type}/{self.script_payload}/history{query}', | f"/script/{self.script_type}/{self.script_payload}/history{query}", | ||||
pb.TxHistoryPage) | pb.TxHistoryPage, | ||||
) | |||||
def unconfirmed_txs(self): | def unconfirmed_txs(self): | ||||
return self.client._request_get( | return self.client._request_get( | ||||
f'/script/{self.script_type}/{self.script_payload}/unconfirmed-txs', | f"/script/{self.script_type}/{self.script_payload}/unconfirmed-txs", | ||||
pb.TxHistoryPage) | pb.TxHistoryPage, | ||||
) | |||||
def utxos(self): | def utxos(self): | ||||
return self.client._request_get( | return self.client._request_get( | ||||
f'/script/{self.script_type}/{self.script_payload}/utxos', | f"/script/{self.script_type}/{self.script_payload}/utxos", pb.ScriptUtxos | ||||
pb.ScriptUtxos) | ) | ||||
class ChronikWs: | class ChronikWs: | ||||
def __init__(self, ws) -> None: | def __init__(self, ws) -> None: | ||||
self.ws = ws | self.ws = ws | ||||
def recv(self): | def recv(self): | ||||
data = self.ws.recv() | data = self.ws.recv() | ||||
Show All 12 Lines | def sub_script(self, script_type: str, payload: bytes, *, is_unsub=False) -> None: | ||||
sub = pb.WsSub( | sub = pb.WsSub( | ||||
is_unsub=is_unsub, | is_unsub=is_unsub, | ||||
script=pb.WsSubScript(script_type=script_type, payload=payload), | script=pb.WsSubScript(script_type=script_type, payload=payload), | ||||
) | ) | ||||
self.send_bytes(sub.SerializeToString()) | self.send_bytes(sub.SerializeToString()) | ||||
class ChronikClient: | class ChronikClient: | ||||
CONTENT_TYPE = 'application/x-protobuf' | CONTENT_TYPE = "application/x-protobuf" | ||||
def __init__(self, host: str, port: int, timeout=DEFAULT_TIMEOUT) -> None: | def __init__(self, host: str, port: int, timeout=DEFAULT_TIMEOUT) -> None: | ||||
self.host = host | self.host = host | ||||
self.port = port | self.port = port | ||||
self.timeout = timeout | self.timeout = timeout | ||||
def _request_get(self, path: str, pb_type): | def _request_get(self, path: str, pb_type): | ||||
kwargs = {} | kwargs = {} | ||||
if self.timeout is not None: | if self.timeout is not None: | ||||
kwargs['timeout'] = self.timeout | kwargs["timeout"] = self.timeout | ||||
client = http.client.HTTPConnection(self.host, self.port, **kwargs) | client = http.client.HTTPConnection(self.host, self.port, **kwargs) | ||||
client.request('GET', path) | client.request("GET", path) | ||||
response = client.getresponse() | response = client.getresponse() | ||||
content_type = response.getheader('Content-Type') | content_type = response.getheader("Content-Type") | ||||
body = response.read() | body = response.read() | ||||
if content_type != self.CONTENT_TYPE: | if content_type != self.CONTENT_TYPE: | ||||
raise UnexpectedContentType( | raise UnexpectedContentType( | ||||
f'Unexpected Content-Type "{content_type}" (expected ' | f'Unexpected Content-Type "{content_type}" (expected ' | ||||
f'"{self.CONTENT_TYPE}"), body: {repr(body)}' | f'"{self.CONTENT_TYPE}"), body: {repr(body)}' | ||||
) | ) | ||||
if response.status != 200: | if response.status != 200: | ||||
proto_error = pb.Error() | proto_error = pb.Error() | ||||
proto_error.ParseFromString(body) | proto_error.ParseFromString(body) | ||||
return ChronikResponse(response.status, error_proto=proto_error) | return ChronikResponse(response.status, error_proto=proto_error) | ||||
ok_proto = pb_type() | ok_proto = pb_type() | ||||
ok_proto.ParseFromString(body) | ok_proto.ParseFromString(body) | ||||
return ChronikResponse(response.status, ok_proto=ok_proto) | return ChronikResponse(response.status, ok_proto=ok_proto) | ||||
def blockchain_info(self) -> ChronikResponse: | def blockchain_info(self) -> ChronikResponse: | ||||
return self._request_get('/blockchain-info', pb.BlockchainInfo) | return self._request_get("/blockchain-info", pb.BlockchainInfo) | ||||
def block(self, hash_or_height: Union[str, int]) -> ChronikResponse: | def block(self, hash_or_height: Union[str, int]) -> ChronikResponse: | ||||
return self._request_get(f'/block/{hash_or_height}', pb.Block) | return self._request_get(f"/block/{hash_or_height}", pb.Block) | ||||
def block_txs(self, hash_or_height: Union[str, int], | def block_txs( | ||||
page=None, page_size=None) -> ChronikResponse: | self, hash_or_height: Union[str, int], page=None, page_size=None | ||||
) -> ChronikResponse: | |||||
query = _page_query_params(page, page_size) | query = _page_query_params(page, page_size) | ||||
return self._request_get( | return self._request_get( | ||||
f'/block-txs/{hash_or_height}{query}', pb.TxHistoryPage) | f"/block-txs/{hash_or_height}{query}", pb.TxHistoryPage | ||||
) | |||||
def blocks(self, start_height: int, end_height: int) -> ChronikResponse: | def blocks(self, start_height: int, end_height: int) -> ChronikResponse: | ||||
return self._request_get(f'/blocks/{start_height}/{end_height}', pb.Blocks) | return self._request_get(f"/blocks/{start_height}/{end_height}", pb.Blocks) | ||||
def tx(self, txid: str) -> ChronikResponse: | def tx(self, txid: str) -> ChronikResponse: | ||||
return self._request_get(f'/tx/{txid}', pb.Tx) | return self._request_get(f"/tx/{txid}", pb.Tx) | ||||
def raw_tx(self, txid: str) -> bytes: | def raw_tx(self, txid: str) -> bytes: | ||||
return self._request_get(f'/raw-tx/{txid}', pb.RawTx) | return self._request_get(f"/raw-tx/{txid}", pb.RawTx) | ||||
def script(self, script_type: str, script_payload: str) -> ChronikScriptClient: | def script(self, script_type: str, script_payload: str) -> ChronikScriptClient: | ||||
return ChronikScriptClient(self, script_type, script_payload) | return ChronikScriptClient(self, script_type, script_payload) | ||||
def ws(self, *, timeout=None) -> ChronikWs: | def ws(self, *, timeout=None) -> ChronikWs: | ||||
ws = websocket.WebSocket() | ws = websocket.WebSocket() | ||||
ws.connect(f'ws://{self.host}:{self.port}/ws', timeout=timeout) | ws.connect(f"ws://{self.host}:{self.port}/ws", timeout=timeout) | ||||
return ChronikWs(ws) | return ChronikWs(ws) | ||||
def _page_query_params(page=None, page_size=None) -> str: | def _page_query_params(page=None, page_size=None) -> str: | ||||
if page is not None and page_size is not None: | if page is not None and page_size is not None: | ||||
return f'?page={page}&page_size={page_size}' | return f"?page={page}&page_size={page_size}" | ||||
elif page is not None: | elif page is not None: | ||||
return f'?page={page}' | return f"?page={page}" | ||||
elif page_size is not None: | elif page_size is not None: | ||||
return f'?page_size={page_size}' | return f"?page_size={page_size}" | ||||
else: | else: | ||||
return '' | return "" |