# Python code for WLAN configuration of ESP32 and Raspberry Pico # Anders Fongen, 2023 anders@fongen.no # (int(c[1])*16+int(c[2],16)).to_bytes(1,'big') import network, socket, time, json class wlan_config: def connect_wlan(self,known_networks): self.sta_if = network.WLAN(network.STA_IF) # ESP32 Comment out the next line self.sta_if.deinit() # Form of reset self.sta_if.active(True) observed_networks = self.sta_if.scan() #print(observed_networks) #print(known_networks) # Merge the two lists interesting_networks = self._merge(known_networks,observed_networks) #print(interesting_networks) for (rssi,ssid,pw) in interesting_networks: # ESP32 Comment out the next line self.sta_if.deinit() # Form of reset self.sta_if.active(True) attempts = 1 self.sta_if.connect(ssid,pw) status = self.sta_if.status() if status in [network.STAT_NO_AP_FOUND, \ network.STAT_WRONG_PASSWORD]: print("Rejected from ", ssid, "reason=",status) break else: while status != network.STAT_GOT_IP: time.sleep_ms(500) attempts += 1 if attempts > 20: # More than 10 seconds? print("Give up ", ssid, "reason=",status) break # Out of while loop status = self.sta_if.status() if status == network.STAT_GOT_IP: print("Successful connection to ",ssid) self.active_ssid = ssid return True # Successful connect # else iterate in the for loop return False def which_ssid(self): return self.active_ssid def which_ip(self): return self.sta_if.ifconfig() # Merge the list of observed network and known networks, # the resulting list will be only the known networks which # is ibserved at the moment, sorted by descending rssi def _merge(self,known_list,observed_list): new_list = list() for (ssid,pw) in known_list: for obs in observed_list: if ssid == obs[0].decode(): rssi = obs[3] new_list.append((rssi,ssid,pw)) break # Out of inner for loop # Final list should be sorted on RSSI new_list.sort(reverse=True) return new_list def make_ap(self,ssid="RPico-config",pw="123456789"): self.sta_if = network.WLAN(network.AP_IF) # ESP32 Comment out the next line self.sta_if.deinit() # Form of reset # FOR ESP32: Reverse the order of two next lines self.sta_if.config(essid=ssid,password=pw) self.sta_if.active(True) self.active_ssid = ssid while not self.sta_if.active(): time.sleep_ms(500) # Access point now active print("Access point active") class web_config: def __init__(self,get_response, post_handler): self.get_response = get_response # Function self.post_handler = post_handler # Function # Parse a http query string and return named values as a dictionary def _parse_parameters(self,par_string): params = dict() for element in par_string.split('&'): nv = element.split('=') # Unescape %hex coded characters unesc = self._unescape(nv[1]) params[nv[0]] = unesc return params # Replace e.g. %2f with / def _unescape(self,s): b = bytes(s,'utf-8') ix = 0 ix = b.find(b'%',ix) while (ix != -1): hx = b[ix+1:ix+3] bt = bytes.fromhex(hx) b = b.replace(b[ix:ix+3],bt) ix = b.find(b'%',ix+1) return b.decode() def start_web_server(self,port=80): sock = socket.socket() sock.bind(('0.0.0.0',port)) sock.listen() # Accepting at most one pending connection print('listening on port', port) # Now enter main loop. Accept connections and process http requests while True: try: print("Waiting for incoming connection") client_socket, cl_addr = sock.accept() print('Client connection from', cl_addr) # Receive max 1014 bytes (extend if necessary) client_socket.settimeout(1) # Set socket timeout 1 sec request = '' try: while True: buf = client_socket.recv(1024).decode() request = request + buf except OSError: pass if request.startswith("GET / "): response = self.get_response() response_code = "200 OK" elif request.startswith("POST / "): print(request) # Just for debugging # Find where the html body starts (two newline chars ix = request.find('\r\n\r\n') if ix==-1: return # Garbage, quit post_parameters = self._parse_parameters(request[ix+4:]) print(post_parameters) # For debugging only # Send the post parameters to a HANDLER, which returns # a response message response = self.post_handler(post_parameters) response_code = "200 OK" else: response_code = "404 Not Found" response = '

404 Not Found

' except Exception as e: response_code = "500 Internal Server Error" response = "

500 Internal Server Error

" finally: client_socket.send('HTTP/1.0 %s \r\nContent-type: text/html\r\n\r\n'%response_code) client_socket.send(response) client_socket.close() class data_storage_handler: def __init__(self,filename): self.filename = filename def store_json(self,d): # d is a dictionary f = open(self.filename,"w") json.dump(d,f) f.close() def load_json(self): try: f = open(self.filename,"r") d = json.load(f) f.close() except: # TODO gjør dette generisk, skal også brukes av wlankonfig d = dict() return d def handle_post_data(self,d): self.store_json(d) return postresponse%d def load_URL_list(self): d = self.load_json() l = list() for i in range(5): dx = d['ch'+str(i+1)] if dx != '': l.append(dx) return l def load_known_networks(self): # Load the JSON file and convert it # to a list of (ssid,pw) tuples try: l = list() d = self.load_json() # The number of iterations (5) must # correspond to the numer of # fields in the HTML form for i in range(5): ssid = 'ssid'+str(i) pw = 'pw'+str(i) if (d[ssid]==''): continue l.append((d[ssid],d[pw])) finally: return l # HTML code for get and post pages def htmlget(): return """ Raspberry Pico configuration

Raspberry Pico configuration

Please enter configuration info for up to 5 WLAN:

SSIDPassword
Save configuration data:
""" postresponse =""" Configuration data stored

Configuration data stored

Unformatted presentation of data values, refine at will

%s """ # Test code #w = wlan_config() #s = data_storage_handler("wlanconfig") #kn = s.load_known_networks() #if w.connect_wlan(kn): # print("IP config is:",w.which_ip()) # pass # Insert call to application code here #else: # Configuration needed, make AP and start web server # print("Setting up AP") # w.make_ap() # wc = web_config(htmlget,s.handle_post_data) # wc.start_web_server()