-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathserver_side_for_http_push.py
More file actions
171 lines (127 loc) · 5.13 KB
/
server_side_for_http_push.py
File metadata and controls
171 lines (127 loc) · 5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python3
"""
server_side_for_http_push.py — Test server for prometheus-cpp-lite HTTP push examples.
Accepts POST, PUT, and DELETE requests, prints their details to the console,
and returns a 200 OK response. Designed to work with:
- provide_via_http_push_simple.cpp (simple periodic POST)
- provide_via_http_push_advanced.cpp (POST, PUT, DELETE, async)
Usage:
python server_side_for_http_push.py [port]
Default port is 9091. Press Ctrl+C to stop.
"""
import os
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
# =============================================================================
# ANSI color support
# =============================================================================
def _enable_ansi_colors():
"""
Detect whether the terminal supports ANSI escape codes.
On Windows 10+ (build 14393+), enable Virtual Terminal Processing
via the Win32 API so that ANSI codes work in cmd.exe and PowerShell.
Returns True if ANSI colors are available, False otherwise.
"""
# If output is redirected to a file/pipe, disable colors.
if not sys.stdout.isatty():
return False
# Unix terminals generally support ANSI natively.
if os.name != "nt":
return True
# Windows: try to enable VT100 processing on the console handle.
try:
import ctypes
from ctypes import wintypes
kernel32 = ctypes.windll.kernel32
STD_OUTPUT_HANDLE = -11
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
handle = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
if handle == -1:
return False
mode = wintypes.DWORD()
if not kernel32.GetConsoleMode(handle, ctypes.byref(mode)):
return False
if not (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING):
new_mode = mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING
if not kernel32.SetConsoleMode(handle, new_mode):
return False
return True
except Exception:
return False
_COLORS_ENABLED = _enable_ansi_colors()
def _c(code):
"""Return the ANSI escape code if colors are enabled, empty string otherwise."""
return code if _COLORS_ENABLED else ""
# Color constants (resolve once at import time).
COLOR_RESET = _c("\033[0m")
COLOR_GREEN = _c("\033[32m")
COLOR_YELLOW = _c("\033[33m")
COLOR_RED = _c("\033[31m")
COLOR_CYAN = _c("\033[36m")
COLOR_DIM = _c("\033[2m")
METHOD_COLORS = {
"POST": COLOR_GREEN,
"PUT": COLOR_YELLOW,
"DELETE": COLOR_RED,
}
# =============================================================================
# HTTP handler
# =============================================================================
class PushgatewayHandler(BaseHTTPRequestHandler):
"""Handles POST, PUT, and DELETE requests, mimicking a Prometheus Pushgateway."""
def _handle_request(self, method):
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8", errors="replace") if content_length > 0 else ""
color = METHOD_COLORS.get(method, COLOR_CYAN)
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"\n{COLOR_DIM}[{timestamp}]{COLOR_RESET} "
f"{color}{method}{COLOR_RESET} {self.path}")
print(f" Content-Length: {content_length}")
if body:
print(f" Body:")
for line in body.splitlines():
if line.startswith("#"):
print(f" {COLOR_DIM}{line}{COLOR_RESET}")
else:
print(f" {COLOR_CYAN}{line}{COLOR_RESET}")
else:
print(f" {COLOR_DIM}(empty body){COLOR_RESET}")
separator = "-" * 60
print(f" {COLOR_DIM}{separator}{COLOR_RESET}")
# Send 200 OK response.
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
def do_POST(self):
self._handle_request("POST")
def do_PUT(self):
self._handle_request("PUT")
def do_DELETE(self):
self._handle_request("DELETE")
def log_message(self, format, *args):
"""Suppress default access log — we print our own formatted output."""
pass
# =============================================================================
# Main
# =============================================================================
def main():
port = int(sys.argv[1]) if len(sys.argv) > 1 else 9091
server = HTTPServer(("localhost", port), PushgatewayHandler)
banner_line = "=" * 60
print(banner_line)
print(f" Prometheus Pushgateway test server")
print(f" Listening on http://localhost:{port}")
print(f" Accepts {COLOR_GREEN}POST{COLOR_RESET}, "
f"{COLOR_YELLOW}PUT{COLOR_RESET}, "
f"{COLOR_RED}DELETE{COLOR_RESET} requests")
print(f" Press Ctrl+C to stop")
print(banner_line)
try:
server.serve_forever()
except KeyboardInterrupt:
print(f"\n{COLOR_DIM}Server stopped.{COLOR_RESET}")
server.server_close()
if __name__ == "__main__":
main()