channel estimation mostly done, working out bugs

This commit is contained in:
David Lenfesty 2019-08-25 20:07:21 -06:00
parent 71a6045885
commit 8b3396278a
3 changed files with 97 additions and 15 deletions

View File

@ -1,7 +1,12 @@
import numpy as np import numpy as np
import scipy
channel_response = np.array([0, 0, 0, 1, 0, 0, 0]) channel_response = np.array([0, 0, 0, 1, 0, 0, 0])
# How do I sync this across two files?
# figure it out later
pilot_value = 1 + 1j
# 15dB seems to be around the minimum for error-free transmission # 15dB seems to be around the minimum for error-free transmission
snr_db = 15 snr_db = 15
@ -21,4 +26,38 @@ def sim(in_data):
return out_data return out_data
# Again, most of this is stolen from the guide
# this sort of stuff I had no idea about before I read the guide
def estimate(in_data, pilots=0):
all_carriers = np.arange(len(in_data))
if pilots > 0:
pilot_carriers = all_carriers[::(len(all_carriers)) // pilots]
pilot_carriers = np.delete(pilot_carriers, 0)
# start averaging
H_est = 0
for i in range(len(in_data)):
H_est_pilots = in_data[i][pilots] / pilot_value
H_est_abs = scipy.interpolate.interp1d(pilot_carriers, abs(H_est_pilots), kind='linear')(all_carriers)
H_est_phase = scipy.interpolate.interp1d(pilot_carriers, np.angle(H_est_pilots), kind='linear')(all_carriers)
H_est += H_est_abs * np.exp(1j*H_est_phase)
H_est = H_est / len(in_data)
return H_est
else:
return 1
def equalize(in_data, H_est):
out_data = np.ndarray((len(in_data), len(in_data[0])), dtype=np.csingle)
for i in range(len(in_data)):
out_data[i] = in_data[i] / H_est
return out_data

17
main.py
View File

@ -12,6 +12,11 @@ TODO:
Add channel estimation via pilot carriers Add channel estimation via pilot carriers
Add some sort of payload support, i.e. be able to drop the padding at the end Add some sort of payload support, i.e. be able to drop the padding at the end
I don't believe this architecture will work too well for an FPGA, right now it's kind of hacky,
and obviously there are parallelisation improvements on an FPGA, so this will likely have to be redone.
Right now though it's just proof of concept to see if I can get a reliable signal to work.
""" """
import numpy as np import numpy as np
@ -50,7 +55,7 @@ if __name__ == '__main__':
parallel = parallelise(64, bytes) parallel = parallelise(64, bytes)
modulated = qam.modulate(parallel) modulated = qam.modulate(parallel, pilots=10)
ofdm_time = np.fft.ifft(modulated) ofdm_time = np.fft.ifft(modulated)
@ -58,13 +63,15 @@ if __name__ == '__main__':
rx = channel.sim(tx) rx = channel.sim(tx)
# Put the channel simulator stuff here
ofdm_cp_removed = cp_remove(rx, 4) ofdm_cp_removed = cp_remove(rx, 4)
to_decode = np.fft.fft(ofdm_cp_removed) to_equalize = np.fft.fft(ofdm_cp_removed)
to_serialise = qam.demodulate(to_decode) H_est = channel.estimate(to_equalize, pilots=10)
to_decode = channel.equalize(to_equalize, H_est)
to_serialise = qam.demodulate(to_decode, pilots=10)
data = serialise(64, to_serialise) data = serialise(64, to_serialise)

56
qam.py
View File

@ -1,6 +1,8 @@
import numpy as np import numpy as np
from scipy.spatial.distance import euclidean from scipy.spatial.distance import euclidean
pilot_value = 1 + 1j
qam_mapping_table = { qam_mapping_table = {
0 : 1 + 1j, 0 : 1 + 1j,
1 : -1 + 1j, 1 : -1 + 1j,
@ -10,43 +12,77 @@ qam_mapping_table = {
qam_demapping_table = { x : y for y, x in qam_mapping_table.items() } qam_demapping_table = { x : y for y, x in qam_mapping_table.items() }
def modulate(in_data): def modulate(in_data, pilots=0):
""" """
Modulates into 4-QAM encoding, might change that number later. Modulates into 4-QAM encoding, might change that number later.
Parameters: Parameters:
in_data - m X n array, m symbols to run on in_data - m X n array, m symbols to run on
pilots (optional) - number of pilot signals to intersperse into carriers
Output: Output:
data data
""" """
#initialise output array num_data_carriers = len(in_data[0])
out_data = np.ndarray((len(in_data), len(in_data[0])), dtype=np.csingle)
all_carriers = np.arange(num_data_carriers + pilots, dtype=int)
if pilots > 0:
pilot_carriers = all_carriers[::(num_data_carriers + pilots)//pilots]
pilot_carriers = np.delete(pilot_carriers, 0) # not sure how to not have this line
data_carriers = np.delete(all_carriers, pilot_carriers)
else:
data_carriers = all_carriers
print(pilot_carriers)
#initialise output array with additional pilot carriers as well
out_data = np.ndarray((len(in_data), num_data_carriers + pilots), dtype=np.csingle)
for i in range(len(in_data)): for i in range(len(in_data)):
for j in range(len(in_data[0])): data_index = 0
out_data[i][j] = qam_mapping_table[in_data[i][j]] for carrier_index in data_carriers:
out_data[i][carrier_index] = qam_mapping_table[in_data[i][data_index]]
data_index += 1
if pilots > 0:
for j in pilot_carriers:
# Value for pilot carriers
out_data[i][j] = pilot_value
return out_data return out_data
def demodulate(in_data): def demodulate(in_data, pilots=0):
out_data = np.ndarray((len(in_data), len(in_data[0])), dtype=np.uint8) all_carriers = np.arange(len(in_data[0]), dtype=int)
if pilots > 0:
pilot_carriers = all_carriers[::(len(all_carriers)) // pilots]
pilot_carriers = np.delete(pilot_carriers, 0) # not sure how to not have this line
data_carriers = np.delete(all_carriers, pilot_carriers)
else:
data_carriers = all_carriers
out_data = np.ndarray((len(in_data), len(data_carriers)), dtype=np.uint8)
# Just pull the constellation array data out # Just pull the constellation array data out
constellation = [ x for x in qam_demapping_table.keys() ] constellation = [ x for x in qam_demapping_table.keys() ]
for i in range(len(in_data)): for i in range(len(in_data)):
for j in range(len(in_data[0])): data_index = 0
for carrier_index in data_carriers:
distances = np.ndarray((len(constellation)), dtype=np.single) distances = np.ndarray((len(constellation)), dtype=np.single)
# Here we have to map to the closest constellation point, # Here we have to map to the closest constellation point,
# because floating point error # because floating point error
for k in range(len(constellation)): for k in range(len(constellation)):
distances[k] = euclidean(in_data[i][j], constellation[k]) distances[k] = euclidean(in_data[i][carrier_index], constellation[k])
# output is the index of the constellation, essentially # output is the index of the constellation, essentially
# this may have to change if I want to generalise # this may have to change if I want to generalise
out_data[i][j] = np.argmin(distances) out_data[i][data_index] = np.argmin(distances)
data_index += 1
return out_data return out_data